activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [37/50] [abbrv] activemq-artemis git commit: Added ANYCAST routing to local queues
Date Mon, 07 Nov 2016 16:36:57 GMT
Added ANYCAST routing to local queues


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3aa84a99
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3aa84a99
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3aa84a99

Branch: refs/heads/ARTEMIS-780
Commit: 3aa84a99adef1088c7a16057380c193f2b0f40ae
Parents: bab49b8
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Oct 24 14:27:00 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Nov 7 11:28:07 2016 -0500

----------------------------------------------------------------------
 .../artemis/core/postoffice/AddressManager.java |   2 +
 .../artemis/core/postoffice/PostOffice.java     |   2 +
 .../core/postoffice/impl/BindingsImpl.java      |   1 +
 .../core/postoffice/impl/LocalQueueBinding.java |   9 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   5 +
 .../postoffice/impl/SimpleAddressManager.java   |  15 ++
 .../artemis/core/server/ActiveMQServer.java     |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  12 +-
 .../artemis/core/server/impl/AddressInfo.java   |  12 +-
 .../server/impl/PostOfficeJournalLoader.java    |   3 +-
 .../core/server/impl/QueueFactoryImpl.java      |   8 +
 .../core/config/impl/FileConfigurationTest.java |   4 +-
 .../integration/addressing/AddressingTest.java  | 240 ++++++++++++++++++-
 .../integration/client/HangConsumerTest.java    |   2 +-
 .../jms/client/TopicCleanupTest.java            |   2 +-
 .../core/server/impl/fakes/FakePostOffice.java  |   5 +
 16 files changed, 300 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 5519822..1cf1a07 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -54,6 +54,8 @@ public interface AddressManager {
 
    AddressInfo addAddressInfo(AddressInfo addressInfo);
 
+   AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
+
    AddressInfo removeAddressInfo(SimpleString address);
 
    AddressInfo getAddressInfo(SimpleString address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index f719966..7902352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -45,6 +45,8 @@ public interface PostOffice extends ActiveMQComponent {
 
    AddressInfo addAddressInfo(AddressInfo addressInfo);
 
+   AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
+
    AddressInfo removeAddressInfo(SimpleString address);
 
    AddressInfo getAddressInfo(SimpleString address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index e5df737..6be0311 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -262,6 +262,7 @@ public final class BindingsImpl implements Bindings {
       boolean routed = false;
 
       for (Binding binding : exclusiveBindings) {
+
          if (binding.getFilter() == null || binding.getFilter().match(message)) {
             binding.getBindable().route(message, context);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 2a6d9c5..2921388 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -24,10 +24,11 @@ import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 
 public class LocalQueueBinding implements QueueBinding {
 
-   private final SimpleString address;
+   private final AddressInfo address;
 
    private final Queue queue;
 
@@ -37,7 +38,7 @@ public class LocalQueueBinding implements QueueBinding {
 
    private final SimpleString clusterName;
 
-   public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString
nodeID) {
+   public LocalQueueBinding(final AddressInfo address, final Queue queue, final SimpleString
nodeID) {
       this.address = address;
 
       this.queue = queue;
@@ -61,7 +62,7 @@ public class LocalQueueBinding implements QueueBinding {
 
    @Override
    public SimpleString getAddress() {
-      return address;
+      return address.getName();
    }
 
    @Override
@@ -76,7 +77,7 @@ public class LocalQueueBinding implements QueueBinding {
 
    @Override
    public SimpleString getRoutingName() {
-      return name;
+      return (address.getRoutingType() == AddressInfo.RoutingType.MULTICAST) ? name : address.getName();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9b7ed0c..6c654bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -425,6 +425,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
    }
 
    @Override
+   public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+      return addressManager.addOrUpdateAddressInfo(addressInfo);
+   }
+
+   @Override
    public AddressInfo removeAddressInfo(SimpleString address) {
       return addressManager.removeAddressInfo(address);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 2994f9e..969a1a9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -188,6 +188,21 @@ public class SimpleAddressManager implements AddressManager {
    }
 
    @Override
+   public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+      AddressInfo from = addAddressInfo(addressInfo);
+      return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo);
+   }
+
+   private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) {
+      synchronized (from) {
+         from.setRoutingType(to.getRoutingType());
+         from.setDefaultMaxConsumers(to.getDefaultMaxConsumers());
+         from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers());
+         return from;
+      }
+   }
+
+   @Override
    public AddressInfo removeAddressInfo(SimpleString address) {
       return addressInfoMap.remove(address);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 01fa89a..a6256d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -417,7 +417,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    void removeClientConnection(String clientId);
 
-   AddressInfo addAddressInfo(AddressInfo addressInfo);
+   AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo);
 
    AddressInfo removeAddressInfo(SimpleString address);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 4c5a0d6..375e678 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2094,7 +2094,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers());
          info.setDefaultMaxConsumers(config.getDefaultMaxConsumers());
 
-         addAddressInfo(info);
+         createOrUpdateAddressInfo(info);
          deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
       }
    }
@@ -2198,8 +2198,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
-   public AddressInfo addAddressInfo(AddressInfo addressInfo) {
-      return postOffice.addAddressInfo(addressInfo);
+   public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) {
+      return postOffice.addOrUpdateAddressInfo(addressInfo);
    }
 
    @Override
@@ -2209,7 +2209,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    @Override
    public AddressInfo getAddressInfo(SimpleString address) {
-      return postOffice.removeAddressInfo(address);
+      return postOffice.getAddressInfo(address);
    }
 
    private Queue createQueue(final SimpleString addressName,
@@ -2245,15 +2245,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
       final Queue queue = queueFactory.createQueueWith(queueConfig);
 
-      addAddressInfo(new AddressInfo(queue.getAddress()));
-
       if (transientQueue) {
          queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
       } else if (queue.isAutoCreated()) {
          queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(),
queue.getName()));
       }
 
-      final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue,
nodeManager.getNodeId());
+      final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()),
queue, nodeManager.getNodeId());
 
       if (queue.isDurable()) {
          storageManager.addQueueBinding(txID, localQueueBinding);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 4c6ec1f..1449107 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -22,7 +22,7 @@ public class AddressInfo {
 
    private final SimpleString name;
 
-   private RoutingType routingType = RoutingType.Multicast;
+   private RoutingType routingType = RoutingType.MULTICAST;
 
    private boolean defaultDeleteOnNoConsumers;
 
@@ -61,13 +61,13 @@ public class AddressInfo {
    }
 
    public enum RoutingType {
-      Multicast, Anycast;
+      MULTICAST, ANYCAST;
 
       public byte getType() {
          switch (this) {
-            case Multicast:
+            case MULTICAST:
                return 0;
-            case Anycast:
+            case ANYCAST:
                return 1;
             default:
                return -1;
@@ -77,9 +77,9 @@ public class AddressInfo {
       public static RoutingType getType(byte type) {
          switch (type) {
             case 0:
-               return Multicast;
+               return MULTICAST;
             case 1:
-               return Anycast;
+               return ANYCAST;
             default:
                return null;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 9a8ae74..71c5b2b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
             }
          }
 
-         final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
+         final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()),
queue, nodeManager.getNodeId());
+
          queues.put(queue.getID(), queue);
          postOffice.addBinding(binding);
          managementService.registerAddress(queue.getAddress());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 5686c7b..3678553 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -68,6 +68,10 @@ public class QueueFactoryImpl implements QueueFactory {
 
    @Override
    public Queue createQueueWith(final QueueConfig config) {
+
+      // Add default address info if one doesn't exist
+      postOffice.addAddressInfo(new AddressInfo(config.address()));
+
       final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
       final Queue queue;
       if (addressSettings.isLastValueQueue()) {
@@ -89,6 +93,10 @@ public class QueueFactoryImpl implements QueueFactory {
                             final boolean durable,
                             final boolean temporary,
                             final boolean autoCreated) {
+
+      // Add default address info if one doesn't exist
+      postOffice.addAddressInfo(new AddressInfo(address));
+
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
 
       Queue queue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 214070e..c1639c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -369,7 +369,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       // Addr 1
       CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0);
       assertEquals("addr1", addressConfiguration.getName());
-      assertEquals(AddressInfo.RoutingType.Anycast, addressConfiguration.getRoutingType());
+      assertEquals(AddressInfo.RoutingType.ANYCAST, addressConfiguration.getRoutingType());
       assertEquals(2, addressConfiguration.getQueueConfigurations().size());
 
       // Addr 1 Queue 1
@@ -395,7 +395,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       // Addr 2
       addressConfiguration = conf.getAddressConfigurations().get(1);
       assertEquals("addr2", addressConfiguration.getName());
-      assertEquals(AddressInfo.RoutingType.Multicast, addressConfiguration.getRoutingType());
+      assertEquals(AddressInfo.RoutingType.MULTICAST, addressConfiguration.getRoutingType());
       assertEquals(2, addressConfiguration.getQueueConfigurations().size());
 
       // Addr 2 Queue 1

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 43d6071..2e0fda4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -16,6 +16,244 @@
  */
 package org.apache.activemq.artemis.tests.integration.addressing;
 
-public class AddressingTest {
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AddressingTest extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   private ClientSessionFactory sessionFactory;
+
+   @Before
+   public void setup() throws Exception {
+      server = createServer(true);
+      server.start();
+
+      server.waitForActivation(10, TimeUnit.SECONDS);
+
+      ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory = sl.createSessionFactory();
+
+      addSessionFactory(sessionFactory);
+   }
+
+   @Test
+   public void testMulticastRouting() throws Exception {
+
+      SimpleString sendAddress = new SimpleString("test.address");
+
+      List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
+
+      for (String consumeAddress : testAddresses) {
+
+         // For each address, create 2 Queues with the same address, assert both queues receive
message
+
+         AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
+         addressInfo.setRoutingType(AddressInfo.RoutingType.MULTICAST);
+
+         server.createOrUpdateAddressInfo(addressInfo);
+         Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress
+ ".1"), null, true, false);
+         Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress
+ ".2"), null, true, false);
+
+         ClientSession session = sessionFactory.createSession();
+         session.start();
+
+         ClientConsumer consumer1 = session.createConsumer(q1.getName());
+         ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+         ClientProducer producer = session.createProducer(sendAddress);
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+         m.getBodyBuffer().writeString("TestMessage");
+
+         producer.send(m);
+
+         assertNotNull(consumer1.receive(2000));
+         assertNotNull(consumer2.receive(2000));
+
+         q1.deleteQueue();
+         q2.deleteQueue();
+
+         System.out.println(consumeAddress);
+      }
+   }
+
+   @Test
+   public void testAnycastRouting() throws Exception {
+
+      SimpleString sendAddress = new SimpleString("test.address");
+
+      List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
+
+      for (String consumeAddress : testAddresses) {
+
+         // For each address, create 2 Queues with the same address, assert one queue receive
message
+
+         AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
+         addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
+
+         server.createOrUpdateAddressInfo(addressInfo);
+         Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress
+ ".1"), null, true, false);
+         Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress
+ ".2"), null, true, false);
+
+         ClientSession session = sessionFactory.createSession();
+         session.start();
+
+         ClientConsumer consumer1 = session.createConsumer(q1.getName());
+         ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+         ClientProducer producer = session.createProducer(sendAddress);
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+
+         m.getBodyBuffer().writeString("TestMessage");
+
+         producer.send(m);
+
+         int count = 0;
+         count = (consumer1.receive(1000) == null) ? count : count + 1;
+         count = (consumer2.receive(1000) == null) ? count : count + 1;
+         assertEquals(1, count);
+
+         q1.deleteQueue();
+         q2.deleteQueue();
+
+         System.out.println(consumeAddress);
+      }
+   }
+
+   @Test
+   public void testAnycastRoutingRoundRobin() throws Exception {
+
+      SimpleString address = new SimpleString("test.address");
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
+
+      server.createOrUpdateAddressInfo(addressInfo);
+      Queue q1 = server.createQueue(address, address.concat(".1"), null, true, false);
+      Queue q2 = server.createQueue(address, address.concat(".2"), null, true, false);
+      Queue q3 = server.createQueue(address, address.concat(".3"), null, true, false);
+
+      ClientSession session = sessionFactory.createSession();
+      session.start();
+
+      ClientProducer producer = session.createProducer(address);
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+      ClientConsumer consumer3 = session.createConsumer(q3.getName());
+      List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[]
{consumer1, consumer2, consumer3}));
+
+      List<String> messages = new ArrayList<>();
+      messages.add("Message1");
+      messages.add("Message2");
+      messages.add("Message3");
+
+      ClientMessage clientMessage;
+      for (String message : messages) {
+         clientMessage = session.createMessage(true);
+         clientMessage.getBodyBuffer().writeString(message);
+         producer.send(clientMessage);
+      }
+
+      String m;
+      for (ClientConsumer consumer : consumers) {
+         clientMessage = consumer.receive(1000);
+         m = clientMessage.getBodyBuffer().readString();
+         messages.remove(m);
+      }
+
+      assertTrue(messages.isEmpty());
+
+      // Check we don't receive more messages
+      int count = 0;
+      for (ClientConsumer consumer : consumers) {
+         count = (consumer.receive(1000) == null) ? count : count + 1;
+      }
+      assertEquals(0, count);
+   }
+
+
+
+   @Test
+   public void testMulticastRoutingBackwardsCompat() throws Exception {
+
+      SimpleString sendAddress = new SimpleString("test.address");
+
+      List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
+
+      for (String consumeAddress : testAddresses) {
+
+         // For each address, create 2 Queues with the same address, assert both queues receive
message
+         Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress
+ ".1"), null, true, false);
+         Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress
+ ".2"), null, true, false);
+
+         ClientSession session = sessionFactory.createSession();
+         session.start();
+
+         ClientConsumer consumer1 = session.createConsumer(q1.getName());
+         ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+         ClientProducer producer = session.createProducer(sendAddress);
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+         m.getBodyBuffer().writeString("TestMessage");
+
+         producer.send(m);
+
+         assertNotNull(consumer1.receive(2000));
+         assertNotNull(consumer2.receive(2000));
+
+         q1.deleteQueue();
+         q2.deleteQueue();
+
+         System.out.println(consumeAddress);
+      }
+   }
+
+   @Test
+   public void testDeleteQueueOnNoConsumersTrue() {
+      fail("Not Implemented");
+   }
+
+   @Test
+   public void testDeleteQueueOnNoConsumersFalse() {
+      fail("Not Implemented");
+   }
+
+   @Test
+   public void testLimitOnMaxConsumers() {
+      fail("Not Implemented");
+   }
+
+   @Test
+   public void testUnlimitedMaxConsumers() {
+      fail("Not Implemented");
+   }
+
+   @Test
+   public void testDefaultMaxConsumersFromAddress() {
+      fail("Not Implemented");
+   }
+
+   @Test
+   public void testDefaultDeleteOnNoConsumersFromAddress() {
+      fail("Not Implemented");
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 83d28a1..2fd5915 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -353,7 +353,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
       long txID = server.getStorageManager().generateID();
 
       // Forcing a situation where the server would unexpectedly create a duplicated queue.
The server should still start normally
-      LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID,
QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
+      LocalQueueBinding newBinding = new LocalQueueBinding(server.getAddressInfo(QUEUE),
new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null,
null), server.getNodeID());
       server.getStorageManager().addQueueBinding(txID, newBinding);
       server.getStorageManager().commitBindings(txID);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index 280596a..ec279ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -83,7 +83,7 @@ public class TopicCleanupTest extends JMSTestBase {
 
             final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"),
SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER),
null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(),
server.getExecutorFactory().getExecutor());
 
-            LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue,
server.getNodeID());
+            LocalQueueBinding binding = new LocalQueueBinding(server.getAddressInfo(queue.getAddress()),
queue, server.getNodeID());
 
             storage.addQueueBinding(txid, binding);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 9424fc3..512f0f2 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -65,6 +65,11 @@ public class FakePostOffice implements PostOffice {
       return null;
    }
 
+   @Override
+   public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+      return null;
+   }
+
 
    @Override
    public AddressInfo removeAddressInfo(SimpleString address) {


Mime
View raw message