activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1186095 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/virtual/ test/java/org/apache/activemq/usecases/
Date Wed, 19 Oct 2011 11:13:19 GMT
Author: dejanb
Date: Wed Oct 19 11:13:19 2011
New Revision: 1186095

URL: http://svn.apache.org/viewvc?rev=1186095&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3550 - local option for vritual topics

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
Wed Oct 19 11:13:19 2011
@@ -16,10 +16,6 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
@@ -29,10 +25,14 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
 public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
 
-    public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix)
{
-        super(next, prefix, postfix);
+    public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix,
boolean local) {
+        super(next, prefix, postfix, local);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
Wed Oct 19 11:13:19 2011
@@ -40,6 +40,7 @@ public class VirtualTopic implements Vir
     private String postfix = "";
     private String name = ">";
     private boolean selectorAware = false;
+    private boolean local = false;
 
 
     public ActiveMQDestination getVirtualDestination() {
@@ -47,8 +48,8 @@ public class VirtualTopic implements Vir
     }
 
     public Destination intercept(Destination destination) {
-        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(),
getPostfix()) : 
-            new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
+        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(),
getPostfix(), isLocal()) :
+            new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal());
     }
     
 
@@ -111,4 +112,12 @@ public class VirtualTopic implements Vir
     public boolean isSelectorAware() {
         return selectorAware;
     }
+
+    public boolean isLocal() {
+        return local;
+    }
+
+    public void setLocal(boolean local) {
+        this.local = local;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Wed Oct 19 11:13:19 2011
@@ -33,15 +33,17 @@ public class VirtualTopicInterceptor ext
 
     private String prefix;
     private String postfix;
+    private boolean local;
 
-    public VirtualTopicInterceptor(Destination next, String prefix, String postfix) {
+    public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean
local) {
         super(next);
         this.prefix = prefix;
         this.postfix = postfix;
+        this.local = local;
     }
 
     public void send(ProducerBrokerExchange context, Message message) throws Exception {
-        if (!message.isAdvisory()) {
+        if (!message.isAdvisory() && !(local && message.getBrokerPath() !=
null)) {
             ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
             send(context, message, queueConsumers);
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
Wed Oct 19 11:13:19 2011
@@ -189,19 +189,19 @@ public class TwoBrokerVirtualDestDinamic
         nc1.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
         nc1.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions);
         nc1.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-        nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+        //nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
         nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.QUEUE_TYPE));
         nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-        //nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+        nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
 
         NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
conduit);
         nc2.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
         nc2.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions);
         nc2.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-        nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+        //nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
         nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.QUEUE_TYPE));
         nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-        //nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+        nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
     }
 
     private BrokerService createAndConfigureBroker(URI uri) throws Exception {
@@ -211,7 +211,9 @@ public class TwoBrokerVirtualDestDinamic
 
         // make all topics virtual and consumers use the default prefix
         VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
-        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{new
VirtualTopic()});
+        VirtualTopic vTopic = new VirtualTopic();
+        vTopic.setLocal(true);
+        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{vTopic});
         DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{virtualDestinationInterceptor};
         broker.setDestinationInterceptors(destinationInterceptors);
         return broker;



Mime
View raw message