Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 12A83200D5A for ; Thu, 14 Dec 2017 22:01:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1185C160C25; Thu, 14 Dec 2017 21:01:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 30620160BFC for ; Thu, 14 Dec 2017 22:01:34 +0100 (CET) Received: (qmail 78260 invoked by uid 500); 14 Dec 2017 21:01:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 78250 invoked by uid 99); 14 Dec 2017 21:01:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Dec 2017 21:01:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 47043DFBEC; Thu, 14 Dec 2017 21:01:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: <6a92f352c30a4226bf972954d4d225da@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: AMQ-6875 - Use the correct destination for Virtual destination consumers when using Virtual Topics Date: Thu, 14 Dec 2017 21:01:33 +0000 (UTC) archived-at: Thu, 14 Dec 2017 21:01:35 -0000 Repository: activemq Updated Branches: refs/heads/master d3e439378 -> 56baba96c AMQ-6875 - Use the correct destination for Virtual destination consumers when using Virtual Topics Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/56baba96 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/56baba96 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/56baba96 Branch: refs/heads/master Commit: 56baba96c657d4e44b88955a964d6c92ff39b822 Parents: d3e4393 Author: Christopher L. Shannon (cshannon) Authored: Thu Dec 14 16:00:37 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Thu Dec 14 16:01:10 2017 -0500 ---------------------------------------------------------------------- .../activemq/advisory/AdvisoryBroker.java | 46 +++++++++++++++++++- .../network/VirtualConsumerDemandTest.java | 24 ++++++++-- 2 files changed, 65 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/56baba96/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 1acd524..1508c61 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -17,6 +17,7 @@ package org.apache.activemq.advisory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -42,6 +43,7 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualTopic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -58,6 +60,7 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SessionId; +import org.apache.activemq.filter.DestinationPath; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -604,7 +607,7 @@ public class AdvisoryBroker extends BrokerFilter { if(brokerConsumerDests.putIfAbsent(pair, info) == null) { LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info); - info.setDestination(virtualDestination.getVirtualDestination()); + setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { @@ -616,7 +619,7 @@ public class AdvisoryBroker extends BrokerFilter { //this is the case of a real consumer coming online } else { info = info.copy(); - info.setDestination(virtualDestination.getVirtualDestination()); + setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { @@ -626,6 +629,45 @@ public class AdvisoryBroker extends BrokerFilter { } } + /** + * Sets the virtual destination on the ConsumerInfo + * If this is a VirtualTopic then the destination used will be the actual topic subscribed + * to in order to track demand properly + * + * @param info + * @param virtualDestination + * @param activeMQDest + */ + private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) { + info.setDestination(virtualDestination.getVirtualDestination()); + if (virtualDestination instanceof VirtualTopic) { + VirtualTopic vt = (VirtualTopic) virtualDestination; + String prefix = vt.getPrefix() != null ? vt.getPrefix() : ""; + String postfix = vt.getPostfix() != null ? vt.getPostfix() : ""; + if (prefix.endsWith(".")) { + prefix = prefix.substring(0, prefix.length() - 1); + } + if (postfix.startsWith(".")) { + postfix = postfix.substring(1, postfix.length()); + } + ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null; + ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null; + + String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {}; + String[] activeMQDestPaths = activeMQDest.getDestinationPaths(); + String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {}; + + //sanity check + if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) { + String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length, + activeMQDestPaths.length - postfixPaths.length); + + ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath)); + info.setDestination(newTopic); + } + } + } + @Override public void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination) { http://git-wip-us.apache.org/repos/asf/activemq/blob/56baba96/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index 782f53f..af5c316 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -118,31 +118,49 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport { * @throws Exception */ @Test(timeout = 60 * 1000) - public void testVirtualTopic() throws Exception { + public void testVirtualTopics() throws Exception { Assume.assumeTrue(isUseVirtualDestSubsOnCreation); doSetUp(true, null); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>"); MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar")); + MessageProducer includedProducer2 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2")); + MessageProducer includedProducer3 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3")); Thread.sleep(2000); Message test = localSession.createTextMessage("test"); final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics(); + final DestinationStatistics destinationStatistics2 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics(); + + //No queue destination on the remote side so should not forward + final DestinationStatistics destinationStatistics3 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics(); //this will create the destination so messages accumulate final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics(); + final DestinationStatistics remoteStats2 = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics(); + waitForConsumerCount(destinationStatistics, 1); + waitForConsumerCount(destinationStatistics2, 1); includedProducer.send(test); + includedProducer2.send(localSession.createTextMessage("test2")); + includedProducer3.send(localSession.createTextMessage("test3")); //assert statistics waitForDispatchFromLocalBroker(destinationStatistics, 1); + waitForDispatchFromLocalBroker(destinationStatistics2, 1); assertLocalBrokerStatistics(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics2, 1); assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount()); + assertEquals("remote dest messages", 1, remoteStats2.getMessages().getCount()); - assertRemoteAdvisoryCount(advisoryConsumer, 1); - assertAdvisoryBrokerCounts(1,1,1); + assertRemoteAdvisoryCount(advisoryConsumer, 2); + assertAdvisoryBrokerCounts(1,2,2); + + //should not have forwarded for 3rd topic + Thread.sleep(1000); + assertEquals("local broker dest stat dispatched", 0, destinationStatistics3.getDispatched().getCount()); }