activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5594 - virtual topics and wildcards
Date Wed, 18 Feb 2015 17:29:27 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 141ad4cb8 -> 05c311240


https://issues.apache.org/jira/browse/AMQ-5594 - virtual topics and wildcards


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/05c31124
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/05c31124
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/05c31124

Branch: refs/heads/master
Commit: 05c31124021d18db8248f0a260d794e3b5d11823
Parents: 141ad4c
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Wed Feb 18 18:29:05 2015 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Wed Feb 18 18:29:20 2015 +0100

----------------------------------------------------------------------
 .../region/virtual/MappedQueueFilter.java       |  59 ++++---
 .../broker/region/virtual/VirtualTopic.java     |   3 +-
 .../activemq/filter/DestinationMapNode.java     |   9 +-
 .../activemq/transport/mqtt/PahoMQTTTest.java   | 167 ++++++++++++++++---
 .../activemq/filter/DestinationMapTest.java     |   9 +
 ...okerVirtualDestinationsWithWildcardTest.java |   2 +
 6 files changed, 200 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index 38ccf5d..e8de910 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -48,32 +48,34 @@ public class MappedQueueFilter extends DestinationFilter {
         // recover messages for first consumer only
         boolean noSubs = getConsumers().isEmpty();
 
-        super.addSubscription(context, sub);
-
-        if (noSubs && !getConsumers().isEmpty()) {
-            // new subscription added, recover retroactive messages
-            final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
-            final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
-
-            final ActiveMQDestination newDestination = sub.getActiveMQDestination();
-            final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
-
-            for (Destination virtualDest : virtualDests) {
-                if (virtualDest.getActiveMQDestination().isTopic() &&
-                    (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive()))
{
-
-                    Topic topic = (Topic) getBaseDestination(virtualDest);
-                    if (topic != null) {
-                        // re-use browse() to get recovered messages
-                        final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
-
-                        // add recovered messages to subscription
-                        for (Message message : messages) {
-                            final Message copy = message.copy();
-                            copy.setOriginalDestination(message.getDestination());
-                            copy.setDestination(newDestination);
-                            copy.setRegionDestination(regionDest);
-                            sub.addRecoveredMessage(context, newDestination.isQueue() ? new
IndirectMessageReference(copy) : copy);
+        if (!sub.getActiveMQDestination().isPattern() || sub.getActiveMQDestination().equals(next.getActiveMQDestination()))
{
+            super.addSubscription(context, sub);
+
+            if (noSubs && !getConsumers().isEmpty()) {
+                // new subscription added, recover retroactive messages
+                final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
+                final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
+
+                final ActiveMQDestination newDestination = sub.getActiveMQDestination();
+                final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
+
+                for (Destination virtualDest : virtualDests) {
+                    if (virtualDest.getActiveMQDestination().isTopic() &&
+                            (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive()))
{
+
+                        Topic topic = (Topic) getBaseDestination(virtualDest);
+                        if (topic != null) {
+                            // re-use browse() to get recovered messages
+                            final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
+
+                            // add recovered messages to subscription
+                            for (Message message : messages) {
+                                final Message copy = message.copy();
+                                copy.setOriginalDestination(message.getDestination());
+                                copy.setDestination(newDestination);
+                                copy.setRegionDestination(regionDest);
+                                sub.addRecoveredMessage(context, newDestination.isQueue()
? new IndirectMessageReference(copy) : copy);
+                            }
                         }
                     }
                 }
@@ -99,4 +101,9 @@ public class MappedQueueFilter extends DestinationFilter {
     public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey
key) throws Exception {
         super.deleteSubscription(context, key);
     }
+
+    @Override
+    public String toString() {
+        return "MappedQueueFilter[" + virtualDestination + ", " + next + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
index c6ab07e..769c784 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
@@ -91,10 +91,11 @@ public class VirtualTopic implements VirtualDestination {
 
     @Override
     public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
throws Exception {
-        if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty())
{
+        if (destination.isQueue() && destination.isPattern()) {
             DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix
+ DestinationFilter.ANY_DESCENDENT));
             if (filter.matches(destination)) {
                 broker.addDestination(context, destination, false);
+
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
index bd82a93..4f6ad5a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
@@ -112,10 +112,15 @@ public class DestinationMapNode implements DestinationNode {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     protected void removeDesendentValues(Set answer) {
+        ArrayList<DestinationNode> candidates = new ArrayList<>();
         for (Map.Entry<String, DestinationNode> child : childNodes.entrySet()) {
+            candidates.add(child.getValue());
+        }
+
+        for (DestinationNode node : candidates) {
             // remove all the values from the child
-            answer.addAll(child.getValue().removeValues());
-            answer.addAll(child.getValue().removeDesendentValues());
+            answer.addAll(node.removeValues());
+            answer.addAll(node.removeDesendentValues());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index e5e5fe5..263cafc 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -16,40 +16,29 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.util.Wait;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.junit.Assert.*;
 
 public class PahoMQTTTest extends MQTTTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
 
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        protocolConfig = "transport.activeMQSubscriptionPrefetch=32766";
-        super.setUp();
-    }
-
     @Test(timeout = 300000)
     public void testLotsOfClients() throws Exception {
 
@@ -141,6 +130,142 @@ public class PahoMQTTTest extends MQTTTestSupport {
     }
 
     @Test(timeout = 300000)
+        public void testSubs() throws Exception {
+
+        stopBroker();
+        protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
+        startBroker();
+
+        final DefaultListener listener = new DefaultListener();
+        // subscriber connects and creates durable sub
+        MqttClient client = createClient(false, "receive", listener);
+
+        final String ACCOUNT_PREFIX     = "test/";
+
+
+       client.subscribe(ACCOUNT_PREFIX+"1/2/3");
+       client.subscribe(ACCOUNT_PREFIX+"a/+/#");
+       client.subscribe(ACCOUNT_PREFIX+"#");
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       String expectedResult = "should get everything";
+       client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false);
+
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+       assertEquals(expectedResult, listener.result);
+    }
+
+    @Test(timeout=300000)
+    public void testOverlappingTopics() throws Exception {
+
+        stopBroker();
+        protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
+        startBroker();
+
+        final DefaultListener listener = new DefaultListener();
+        // subscriber connects and creates durable sub
+        MqttClient client = createClient(false, "receive", listener);
+
+        final String ACCOUNT_PREFIX     = "test/";
+
+       // *****************************************
+       // check a simple # subscribe works
+       // *****************************************
+       client.subscribe(ACCOUNT_PREFIX+"#");
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+       String expectedResult = "hello mqtt broker on hash";
+       client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+       assertEquals(expectedResult, listener.result);
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       expectedResult = "hello mqtt broker on a different topic";
+       listener.result = null;
+       client.publish(ACCOUNT_PREFIX+"1/2/3/4/5/6", expectedResult.getBytes(), 0, false);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+       assertEquals(expectedResult, listener.result);
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       // *****************************************
+       // now subscribe on a topic that overlaps the root # wildcard - we should still get
everything
+       // *****************************************
+       client.subscribe(ACCOUNT_PREFIX+"1/2/3");
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       expectedResult = "hello mqtt broker on explicit topic";
+       listener.result = null;
+       client.publish(ACCOUNT_PREFIX+"1/2/3", expectedResult.getBytes(), 0, false);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+       assertEquals(expectedResult, listener.result);
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       expectedResult = "hello mqtt broker on some other topic";
+       listener.result = null;
+       client.publish(ACCOUNT_PREFIX+"a/b/c/d/e", expectedResult.getBytes(), 0, false);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+       assertEquals(expectedResult, listener.result);
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       // *****************************************
+       // now unsub hash - we should only get called back on 1/2/3
+       // *****************************************
+       client.unsubscribe(ACCOUNT_PREFIX+"#");
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       expectedResult = "this should not come back...";
+        listener.result = null;
+       client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+       assertNull(listener.result);
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+       expectedResult = "this should not come back either...";
+        listener.result = null;
+       client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.result != null;
+            }
+        });
+       assertNull(listener.result);
+       assertTrue(client.getPendingDeliveryTokens().length == 0);
+    }
+
+    @Test(timeout = 300000)
     public void testCleanSession() throws Exception {
         String topic = "test";
         final DefaultListener listener = new DefaultListener();
@@ -237,6 +362,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
     static class DefaultListener implements MqttCallback {
 
         int received = 0;
+        String result;
 
         @Override
         public void connectionLost(Throwable cause) {
@@ -247,6 +373,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         public void messageArrived(String topic, MqttMessage message) throws Exception {
             LOG.info("Received: " + message);
             received++;
+            result = new String(message.getPayload());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
index 2f0f92c..3cfbc64 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
@@ -330,6 +330,15 @@ public class DestinationMapTest extends TestCase {
         assertMapValue("FOO.>", v2);
     }
 
+    public void testRemoveWildcard() throws Exception {
+        put("FOO.A", v1);
+        put("FOO.>", v2);
+
+        map.removeAll(createDestination("FOO.>"));
+
+        assertMapValue("FOO.A", null);
+    }
+
     protected void loadSample2() {
         put("TEST.FOO", v1);
         put("TEST.*", v2);

http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
index f30cdb2..7c078b5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
@@ -65,8 +65,10 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple
 
         sendReceive("local.test.1", true, "Consumer.a.local.test.1", false, 1, 1);
         sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1);
+        sendReceive("local.test.1.2", true, "Consumer.a.local.test.>", false, 1, 1);
         sendReceive("global.test.1", true, "Consumer.a.global.test.1", false, 1, 1);
         sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1);
+        sendReceive("global.test.1.2", true, "Consumer.a.global.test.>", false, 1, 1);
 
         destroyAllBrokers();
     }


Mime
View raw message