Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBD0417ED9 for ; Wed, 18 Feb 2015 17:29:27 +0000 (UTC) Received: (qmail 40353 invoked by uid 500); 18 Feb 2015 17:29:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 40305 invoked by uid 500); 18 Feb 2015 17:29:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 40296 invoked by uid 99); 18 Feb 2015 17:29:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Feb 2015 17:29:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 850FCE0535; Wed, 18 Feb 2015 17:29:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dejanb@apache.org To: commits@activemq.apache.org Message-Id: <4882b25aa17e42d38e8b8160db36e457@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5594 - virtual topics and wildcards Date: Wed, 18 Feb 2015 17:29:27 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 141ad4cb8 -> 05c311240 https://issues.apache.org/jira/browse/AMQ-5594 - virtual topics and wildcards Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/05c31124 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/05c31124 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/05c31124 Branch: refs/heads/master Commit: 05c31124021d18db8248f0a260d794e3b5d11823 Parents: 141ad4c Author: Dejan Bosanac Authored: Wed Feb 18 18:29:05 2015 +0100 Committer: Dejan Bosanac Committed: Wed Feb 18 18:29:20 2015 +0100 ---------------------------------------------------------------------- .../region/virtual/MappedQueueFilter.java | 59 ++++--- .../broker/region/virtual/VirtualTopic.java | 3 +- .../activemq/filter/DestinationMapNode.java | 9 +- .../activemq/transport/mqtt/PahoMQTTTest.java | 167 ++++++++++++++++--- .../activemq/filter/DestinationMapTest.java | 9 + ...okerVirtualDestinationsWithWildcardTest.java | 2 + 6 files changed, 200 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index 38ccf5d..e8de910 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -48,32 +48,34 @@ public class MappedQueueFilter extends DestinationFilter { // recover messages for first consumer only boolean noSubs = getConsumers().isEmpty(); - super.addSubscription(context, sub); - - if (noSubs && !getConsumers().isEmpty()) { - // new subscription added, recover retroactive messages - final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); - final Set virtualDests = regionBroker.getDestinations(virtualDestination); - - final ActiveMQDestination newDestination = sub.getActiveMQDestination(); - final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); - - for (Destination virtualDest : virtualDests) { - if (virtualDest.getActiveMQDestination().isTopic() && - (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { - - Topic topic = (Topic) getBaseDestination(virtualDest); - if (topic != null) { - // re-use browse() to get recovered messages - final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); - - // add recovered messages to subscription - for (Message message : messages) { - final Message copy = message.copy(); - copy.setOriginalDestination(message.getDestination()); - copy.setDestination(newDestination); - copy.setRegionDestination(regionDest); - sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); + if (!sub.getActiveMQDestination().isPattern() || sub.getActiveMQDestination().equals(next.getActiveMQDestination())) { + super.addSubscription(context, sub); + + if (noSubs && !getConsumers().isEmpty()) { + // new subscription added, recover retroactive messages + final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); + final Set virtualDests = regionBroker.getDestinations(virtualDestination); + + final ActiveMQDestination newDestination = sub.getActiveMQDestination(); + final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + + for (Destination virtualDest : virtualDests) { + if (virtualDest.getActiveMQDestination().isTopic() && + (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { + + Topic topic = (Topic) getBaseDestination(virtualDest); + if (topic != null) { + // re-use browse() to get recovered messages + final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); + + // add recovered messages to subscription + for (Message message : messages) { + final Message copy = message.copy(); + copy.setOriginalDestination(message.getDestination()); + copy.setDestination(newDestination); + copy.setRegionDestination(regionDest); + sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); + } } } } @@ -99,4 +101,9 @@ public class MappedQueueFilter extends DestinationFilter { public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { super.deleteSubscription(context, key); } + + @Override + public String toString() { + return "MappedQueueFilter[" + virtualDestination + ", " + next + "]"; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index c6ab07e..769c784 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -91,10 +91,11 @@ public class VirtualTopic implements VirtualDestination { @Override public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { - if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) { + if (destination.isQueue() && destination.isPattern()) { DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT)); if (filter.matches(destination)) { broker.addDestination(context, destination, false); + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java index bd82a93..4f6ad5a 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java @@ -112,10 +112,15 @@ public class DestinationMapNode implements DestinationNode { @SuppressWarnings({ "rawtypes", "unchecked" }) protected void removeDesendentValues(Set answer) { + ArrayList candidates = new ArrayList<>(); for (Map.Entry child : childNodes.entrySet()) { + candidates.add(child.getValue()); + } + + for (DestinationNode node : candidates) { // remove all the values from the child - answer.addAll(child.getValue().removeValues()); - answer.addAll(child.getValue().removeDesendentValues()); + answer.addAll(node.removeValues()); + answer.addAll(node.removeDesendentValues()); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index e5e5fe5..263cafc 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -16,40 +16,29 @@ */ package org.apache.activemq.transport.mqtt; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.util.Wait; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import static org.junit.Assert.*; public class PahoMQTTTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); - @Override - @Before - public void setUp() throws Exception { - protocolConfig = "transport.activeMQSubscriptionPrefetch=32766"; - super.setUp(); - } - @Test(timeout = 300000) public void testLotsOfClients() throws Exception { @@ -141,6 +130,142 @@ public class PahoMQTTTest extends MQTTTestSupport { } @Test(timeout = 300000) + public void testSubs() throws Exception { + + stopBroker(); + protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; + startBroker(); + + final DefaultListener listener = new DefaultListener(); + // subscriber connects and creates durable sub + MqttClient client = createClient(false, "receive", listener); + + final String ACCOUNT_PREFIX = "test/"; + + + client.subscribe(ACCOUNT_PREFIX+"1/2/3"); + client.subscribe(ACCOUNT_PREFIX+"a/+/#"); + client.subscribe(ACCOUNT_PREFIX+"#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + String expectedResult = "should get everything"; + client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false); + + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + + assertTrue(client.getPendingDeliveryTokens().length == 0); + assertEquals(expectedResult, listener.result); + } + + @Test(timeout=300000) + public void testOverlappingTopics() throws Exception { + + stopBroker(); + protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; + startBroker(); + + final DefaultListener listener = new DefaultListener(); + // subscriber connects and creates durable sub + MqttClient client = createClient(false, "receive", listener); + + final String ACCOUNT_PREFIX = "test/"; + + // ***************************************** + // check a simple # subscribe works + // ***************************************** + client.subscribe(ACCOUNT_PREFIX+"#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + String expectedResult = "hello mqtt broker on hash"; + client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "hello mqtt broker on a different topic"; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"1/2/3/4/5/6", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + // ***************************************** + // now subscribe on a topic that overlaps the root # wildcard - we should still get everything + // ***************************************** + client.subscribe(ACCOUNT_PREFIX+"1/2/3"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "hello mqtt broker on explicit topic"; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"1/2/3", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "hello mqtt broker on some other topic"; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"a/b/c/d/e", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + // ***************************************** + // now unsub hash - we should only get called back on 1/2/3 + // ***************************************** + client.unsubscribe(ACCOUNT_PREFIX+"#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "this should not come back..."; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertNull(listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "this should not come back either..."; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertNull(listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + } + + @Test(timeout = 300000) public void testCleanSession() throws Exception { String topic = "test"; final DefaultListener listener = new DefaultListener(); @@ -237,6 +362,7 @@ public class PahoMQTTTest extends MQTTTestSupport { static class DefaultListener implements MqttCallback { int received = 0; + String result; @Override public void connectionLost(Throwable cause) { @@ -247,6 +373,7 @@ public class PahoMQTTTest extends MQTTTestSupport { public void messageArrived(String topic, MqttMessage message) throws Exception { LOG.info("Received: " + message); received++; + result = new String(message.getPayload()); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java index 2f0f92c..3cfbc64 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java @@ -330,6 +330,15 @@ public class DestinationMapTest extends TestCase { assertMapValue("FOO.>", v2); } + public void testRemoveWildcard() throws Exception { + put("FOO.A", v1); + put("FOO.>", v2); + + map.removeAll(createDestination("FOO.>")); + + assertMapValue("FOO.A", null); + } + protected void loadSample2() { put("TEST.FOO", v1); put("TEST.*", v2); http://git-wip-us.apache.org/repos/asf/activemq/blob/05c31124/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java index f30cdb2..7c078b5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java @@ -65,8 +65,10 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple sendReceive("local.test.1", true, "Consumer.a.local.test.1", false, 1, 1); sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1); + sendReceive("local.test.1.2", true, "Consumer.a.local.test.>", false, 1, 1); sendReceive("global.test.1", true, "Consumer.a.global.test.1", false, 1, 1); sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1); + sendReceive("global.test.1.2", true, "Consumer.a.global.test.>", false, 1, 1); destroyAllBrokers(); }