activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1205508 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/network/ test/resources/org/apache/activemq/network/multicast/
Date Wed, 23 Nov 2011 17:44:53 GMT
Author: dejanb
Date: Wed Nov 23 17:44:52 2011
New Revision: 1205508

URL: http://svn.apache.org/viewvc?rev=1205508&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3384 - destinationFilter config out of dynamically
included destinations

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1205508&r1=1205507&r2=1205508&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Nov 23 17:44:52 2011
@@ -16,17 +16,6 @@
  */
 package org.apache.activemq.network;
 
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.security.cert.X509Certificate;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.ObjectName;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -37,47 +26,31 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.DefaultTransportListener;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.*;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * A useful base class for implementing demand forwarding bridges.
  *
@@ -311,7 +284,7 @@ public abstract class DemandForwardingBr
                 // determine demand.
                 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
                 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
-                String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter();
+                String advisoryTopic = configuration.getDestinationFilter();
                 if (configuration.isBridgeTempDestinations()) {
                     advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1205508&r1=1205507&r2=1205508&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
Wed Nov 23 17:44:52 2011
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.network;
 
-import java.util.List;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 
+import java.util.List;
+
 /**
  * Configuration for a NetworkBridge
  * 
@@ -39,7 +41,7 @@ public class NetworkBridgeConfiguration 
     private String brokerURL = "";
     private String userName;
     private String password;
-    private String destinationFilter = ">";
+    private String destinationFilter = null;
     private String name = "NC";
     
     private List<ActiveMQDestination> excludedDestinations;
@@ -211,7 +213,33 @@ public class NetworkBridgeConfiguration 
      * @return the destinationFilter
      */
     public String getDestinationFilter() {
-        return this.destinationFilter;
+        if (this.destinationFilter == null) {
+            if (dynamicallyIncludedDestinations != null && !dynamicallyIncludedDestinations.isEmpty())
{
+                StringBuffer filter = new StringBuffer();
+                String delimiter = "";
+                for (ActiveMQDestination destination : dynamicallyIncludedDestinations) {
+                    if (!destination.isTemporary()) {
+                        filter.append(delimiter);
+                        filter.append(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX);
+                        filter.append(destination.getDestinationTypeAsString());
+                        filter.append(".");
+                        filter.append(destination.getPhysicalName());
+                        delimiter = ",";
+                    }
+                }
+                return filter.toString();
+            }   else {
+                return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">";
+            }
+        }   else {
+            // prepend consumer advisory prefix
+            // to keep backward compatibility
+            if (!this.destinationFilter.startsWith(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX))
{
+                 return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + this.destinationFilter;
+            } else {
+                return this.destinationFilter;
+            }
+        }
     }
 
     /**

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java?rev=1205508&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
Wed Nov 23 17:44:52 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.TestCase;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NetworkDestinationFilterTest extends TestCase {
+
+    public void testFilter() throws Exception {
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", config.getDestinationFilter());
+        List<ActiveMQDestination> dests = new ArrayList<ActiveMQDestination>();
+        config.setDynamicallyIncludedDestinations(dests);
+        assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", config.getDestinationFilter());
+        dests.add(new ActiveMQQueue("TEST.>"));
+        dests.add(new ActiveMQTopic("TEST.>"));
+        dests.add(new ActiveMQTempQueue("TEST.>"));
+        String prefix = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX;
+        assertEquals(prefix + "Queue.TEST.>," + prefix + "Topic.TEST.>", config.getDestinationFilter());
+    }
+
+
+}

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml?rev=1205508&r1=1205507&r2=1205508&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml
(original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml
Wed Nov 23 17:44:52 2011
@@ -15,7 +15,12 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<beans>
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
   <broker brokerName="localBroker" persistent="true" useShutdownHook="false" xmlns="http://activemq.apache.org/schema/core">
     

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml?rev=1205508&r1=1205507&r2=1205508&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml
(original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml
Wed Nov 23 17:44:52 2011
@@ -15,7 +15,12 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<beans>
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
   <broker brokerName="remoteBroker" persistent="true" useShutdownHook="false" xmlns="http://activemq.apache.org/schema/core">
 



Mime
View raw message