activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1612 Fix message redistribution for prefixed addresses
Date Wed, 17 Jan 2018 16:19:37 GMT
ARTEMIS-1612 Fix message redistribution for prefixed addresses


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f09bde07
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f09bde07
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f09bde07

Branch: refs/heads/master
Commit: f09bde07dfa56e2c65ccf04adf8e158003fd673b
Parents: 9171d86
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Wed Jan 17 10:24:20 2018 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jan 17 11:19:07 2018 -0500

----------------------------------------------------------------------
 .../core/postoffice/impl/PostOfficeImpl.java    |  2 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |  2 +-
 .../distribution/MessageRedistributionTest.java | 48 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f09bde07/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index e2153e1..d5af56b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -892,7 +892,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
       // as described on https://issues.jboss.org/browse/JBPAPP-6130
       Message copyRedistribute = message.copy(storageManager.generateID());
 
-      Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
 
       if (bindings != null) {
          RoutingContext context = new RoutingContextImpl(tx);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f09bde07/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7cd225f..ac2e406 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1243,7 +1243,7 @@ public abstract class ActiveMQTestBase extends Assert {
       }
       if (params == null)
          params = new HashMap<>();
-      return new TransportConfiguration(className, params);
+      return new TransportConfiguration(className, params, UUIDGenerator.getInstance().generateStringUUID(),
new HashMap<String, Object>());
    }
 
    protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException
{

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f09bde07/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 2020489..df824c6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -30,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@@ -652,6 +654,52 @@ public class MessageRedistributionTest extends ClusterTestBase {
    }
 
    @Test
+   public void testRedistributionWithPrefixesWhenRemoteConsumerIsAdded() throws Exception
{
+
+      for (int i = 0; i <= 2; i++) {
+         ActiveMQServer server = getServer(i);
+         for (TransportConfiguration c : server.getConfiguration().getAcceptorConfigurations())
{
+            c.getExtraParams().putIfAbsent("anycastPrefix", "jms.queue.");
+         }
+      }
+
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      String name = "queues.queue";
+
+      createQueue(0, name, name, null, false, RoutingType.ANYCAST);
+      createQueue(1, name, name, null, false, RoutingType.ANYCAST);
+      createQueue(2, name, name, null, false, RoutingType.ANYCAST);
+
+      addConsumer(0, 0, name, null);
+
+      waitForBindings(0, name, 1, 1, true);
+      waitForBindings(1, name, 1, 0, true);
+      waitForBindings(2, name, 1, 0, true);
+
+      waitForBindings(0, name, 2, 0, false);
+      waitForBindings(1, name, 2, 1, false);
+      waitForBindings(2, name, 2, 1, false);
+
+      removeConsumer(0);
+
+      Thread.sleep(2000);
+
+      send(0, "jms.queue." + name, 20, false, null);
+
+      addConsumer(1, 1, name, null);
+
+      verifyReceiveAll(20, 1);
+      verifyNotReceive(1);
+   }
+
+   @Test
    public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception {
       setupCluster(MessageLoadBalancingType.ON_DEMAND);
 


Mime
View raw message