Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7C63893B7 for ; Wed, 19 Oct 2011 11:13:42 +0000 (UTC) Received: (qmail 57062 invoked by uid 500); 19 Oct 2011 11:13:42 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 57011 invoked by uid 500); 19 Oct 2011 11:13:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 57004 invoked by uid 99); 19 Oct 2011 11:13:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2011 11:13:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2011 11:13:40 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D9BCC2388847 for ; Wed, 19 Oct 2011 11:13:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1186095 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/virtual/ test/java/org/apache/activemq/usecases/ Date: Wed, 19 Oct 2011 11:13:19 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111019111319.D9BCC2388847@eris.apache.org> Author: dejanb Date: Wed Oct 19 11:13:19 2011 New Revision: 1186095 URL: http://svn.apache.org/viewvc?rev=1186095&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3550 - local option for vritual topics Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java?rev=1186095&r1=1186094&r2=1186095&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java Wed Oct 19 11:13:19 2011 @@ -16,10 +16,6 @@ */ package org.apache.activemq.broker.region.virtual; -import java.io.IOException; -import java.util.List; -import java.util.Set; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; @@ -29,10 +25,14 @@ import org.apache.activemq.command.Messa import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import java.io.IOException; +import java.util.List; +import java.util.Set; + public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { - public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix) { - super(next, prefix, postfix); + public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { + super(next, prefix, postfix, local); } /** Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=1186095&r1=1186094&r2=1186095&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java Wed Oct 19 11:13:19 2011 @@ -40,6 +40,7 @@ public class VirtualTopic implements Vir private String postfix = ""; private String name = ">"; private boolean selectorAware = false; + private boolean local = false; public ActiveMQDestination getVirtualDestination() { @@ -47,8 +48,8 @@ public class VirtualTopic implements Vir } public Destination intercept(Destination destination) { - return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix()) : - new VirtualTopicInterceptor(destination, getPrefix(), getPostfix()); + return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : + new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()); } @@ -111,4 +112,12 @@ public class VirtualTopic implements Vir public boolean isSelectorAware() { return selectorAware; } + + public boolean isLocal() { + return local; + } + + public void setLocal(boolean local) { + this.local = local; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1186095&r1=1186094&r2=1186095&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java Wed Oct 19 11:13:19 2011 @@ -33,15 +33,17 @@ public class VirtualTopicInterceptor ext private String prefix; private String postfix; + private boolean local; - public VirtualTopicInterceptor(Destination next, String prefix, String postfix) { + public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { super(next); this.prefix = prefix; this.postfix = postfix; + this.local = local; } public void send(ProducerBrokerExchange context, Message message) throws Exception { - if (!message.isAdvisory()) { + if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) { ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination()); send(context, message, queueConsumers); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java?rev=1186095&r1=1186094&r2=1186095&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java Wed Oct 19 11:13:19 2011 @@ -189,19 +189,19 @@ public class TwoBrokerVirtualDestDinamic nc1.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority); nc1.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions); nc1.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + //nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE)); nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - //nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit); nc2.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority); nc2.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions); nc2.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + //nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE)); nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - //nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); } private BrokerService createAndConfigureBroker(URI uri) throws Exception { @@ -211,7 +211,9 @@ public class TwoBrokerVirtualDestDinamic // make all topics virtual and consumers use the default prefix VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); - virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{new VirtualTopic()}); + VirtualTopic vTopic = new VirtualTopic(); + vTopic.setLocal(true); + virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{vTopic}); DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{virtualDestinationInterceptor}; broker.setDestinationInterceptors(destinationInterceptors); return broker;