activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1076651 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/virtual/ main/java/org/apache/activemq/filter/ test/java/org/apache/activemq/ test/java/org/apache/act...
Date Thu, 03 Mar 2011 15:24:33 GMT
Author: dejanb
Date: Thu Mar  3 15:24:32 2011
New Revision: 1076651

URL: http://svn.apache.org/viewvc?rev=1076651&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3197 - virtual destinations and wildcards

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Thu Mar  3 15:24:32 2011
@@ -129,7 +129,9 @@ public abstract class AbstractRegion imp
                     dest.start();
                     destinations.put(destination, dest);
                     destinationMap.put(destination, dest);
-                    addSubscriptionsForDestination(context, dest);
+                    if (!dest.getActiveMQDestination().isPattern()) {
+                        addSubscriptionsForDestination(context, dest);
+                    }
                 }
                 if (dest == null) {
                     throw new JMSException("The destination " + destination + " does not
exist.");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java
Thu Mar  3 15:24:32 2011
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+
 /**
  * Represents a Composite Pattern of a {@link DestinationInterceptor}
  * 
@@ -42,5 +46,11 @@ public class CompositeDestinationInterce
             interceptors[i].remove(destination);
         } 
     }
+
+    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
throws Exception {
+        for (int i = 0; i < interceptors.length; i++) {
+            interceptors[i].create(broker, context, destination);
+        }
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java
Thu Mar  3 15:24:32 2011
@@ -17,6 +17,10 @@
 package org.apache.activemq.broker.region;
 
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+
 /**
  * Represents an interceptor on destination instances.
  * 
@@ -28,4 +32,6 @@ public interface DestinationInterceptor 
     
     void remove(Destination destination);
 
+    void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
throws Exception;
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Thu Mar  3 15:24:32 2011
@@ -438,6 +438,9 @@ public class RegionBroker extends EmptyB
     @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
         ActiveMQDestination destination = info.getDestination();
+        if (destinationInterceptor != null) {
+            destinationInterceptor.create(this, context, destination);
+        }
         synchronized (purgeInactiveDestinationsTask) {
             switch (destination.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
Thu Mar  3 15:24:32 2011
@@ -18,7 +18,10 @@ package org.apache.activemq.broker.regio
 
 import java.util.Collection;
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
 
 /**
  * 
@@ -35,6 +38,8 @@ public abstract class CompositeDestinati
         return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),
isCopyMessage());
     }
     
+    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
{
+    }
 
     public void remove(Destination destination) {        
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
Thu Mar  3 15:24:32 2011
@@ -16,9 +16,7 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.*;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationInterceptor;
@@ -86,6 +84,8 @@ public class MirroredQueue implements De
         
     }
 
+    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
{}
+
     // Properties
     // -------------------------------------------------------------------------
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
Thu Mar  3 15:24:32 2011
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.command.ActiveMQDestination;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
Thu Mar  3 15:24:32 2011
@@ -21,10 +21,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationMap;
 
@@ -45,8 +48,8 @@ public class VirtualDestinationIntercept
         List<Destination> destinations = new ArrayList<Destination>();
         for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
             VirtualDestination virtualDestination = (VirtualDestination)iter.next();
-            Destination newNestination = virtualDestination.intercept(destination);
-            destinations.add(newNestination);
+            Destination newDestination = virtualDestination.intercept(destination);
+            destinations.add(newDestination);
         }
         if (!destinations.isEmpty()) {
             if (destinations.size() == 1) {
@@ -60,6 +63,12 @@ public class VirtualDestinationIntercept
     }
     
 
+    public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination
destination) throws Exception {
+        for (VirtualDestination virt: virtualDestinations) {
+            virt.create(broker, context, destination);
+        }
+    }
+
     public synchronized void remove(Destination destination) {     
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
Thu Mar  3 15:24:32 2011
@@ -16,9 +16,13 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.filter.DestinationFilter;
 
 /**
  * Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
@@ -48,6 +52,15 @@ public class VirtualTopic implements Vir
     }
     
 
+    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
throws Exception {
+        if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty())
{
+            DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix
+ DestinationFilter.ANY_DESCENDENT));
+            if (filter.matches(destination)) {
+                broker.addDestination(context, destination, false);
+            }
+        }
+    }
+
     public void remove(Destination destination) {        
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
Thu Mar  3 15:24:32 2011
@@ -47,7 +47,7 @@ public class PrefixDestinationFilter ext
         if (path.length >= length) {
             int size = length - 1;
             for (int i = 0; i < size; i++) {
-                if (!prefixes[i].equals(path[i])) {
+                if (!path[i].equals(ANY_CHILD) && !prefixes[i].equals(ANY_CHILD)
&& !prefixes[i].equals(path[i])) {
                     return false;
                 }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Thu Mar  3 15:24:32 2011
@@ -185,6 +185,7 @@ public class JmsMultipleBrokersTestSuppo
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
             BrokerService broker = i.next().broker;
             broker.start();
+            broker.waitUntilStarted();
         }
 
         Thread.sleep(maxSetupTime);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java?rev=1076651&r1=1076650&r2=1076651&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
Thu Mar  3 15:24:32 2011
@@ -82,9 +82,9 @@ public class SingleBrokerVirtualDestinat
         startAllBrokers();
 
         sendReceive("Consumer.a.local.test.>", false, "Consumer.a.local.test.>", false,
1, 1);
-        sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 2); //
duplicates due to wildcard queue pre-created
+        sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1);
         sendReceive("Consumer.a.global.test.>", false, "Consumer.a.global.test.>",
false, 1, 1);
-        sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 2); //
duplicates due to wildcard queue pre-created
+        sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1);
 
         destroyAllBrokers();
     }
@@ -109,8 +109,6 @@ public class SingleBrokerVirtualDestinat
 
     private BrokerService createAndConfigureBroker(URI uri) throws Exception {
         BrokerService broker = createBroker(uri);
-        // without this  testVirtualDestinationsWithWildcardWithoutIndividualVirtualQueue
will fail
-        broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"),
new ActiveMQQueue("Consumer.a.global.test.1")});
 
         configurePersistenceAdapter(broker);
 



Mime
View raw message