activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r745558 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/
Date Wed, 18 Feb 2009 16:24:57 GMT
Author: dejanb
Date: Wed Feb 18 16:24:56 2009
New Revision: 745558

URL: http://svn.apache.org/viewvc?rev=745558&view=rev
Log:
additional fix for https://issues.apache.org/activemq/browse/AMQ-2104 and https://issues.apache.org/activemq/browse/AMQ-1509

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Wed Feb 18 16:24:56 2009
@@ -47,12 +47,12 @@
     }
 
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException
{
-
         if (addToAlreadyInterestedConsumers(info)) {
             return null; // don't want this subscription added
         }
         //add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());
+        info.setSelector(null);
         return doCreateDemandSubscription(info);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Wed Feb 18 16:24:56 2009
@@ -39,7 +39,7 @@
         localInfo = info.copy();
         localInfo.setNetworkSubscription(true);
         remoteSubsIds.add(info.getConsumerId());    
-     }
+    }
 
     /**
      * Increment the consumers associated with this subscription

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Wed Feb 18 16:24:56 2009
@@ -95,6 +95,7 @@
 
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
         }
+        info.setSelector(null);
         return doCreateDemandSubscription(info);
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Wed Feb 18 16:24:56 2009
@@ -75,27 +75,27 @@
     protected boolean verbose;
 
     protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName)
throws Exception {
-        return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1);
+        return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1, true);
     }
 
     protected void bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean
dynamicOnly) throws Exception {
         BrokerService localBroker = brokers.get(localBrokerName).broker;
         BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
 
-        bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1);
+        bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true);
     }
 
-    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName,
boolean dynamicOnly, int networkTTL) throws Exception {
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName,
boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
         BrokerService localBroker = brokers.get(localBrokerName).broker;
         BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
 
-        return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL);
+        return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit);
     }
 
     // Overwrite this method to specify how you want to bridge the two brokers
     // By default, bridge them using add network connector of the local broker
     // and the first connector of the remote broker
-    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL) throws Exception {
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
         List transportConnectors = remoteBroker.getTransportConnectors();
         URI remoteURI;
         if (!transportConnectors.isEmpty()) {
@@ -103,6 +103,7 @@
             NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:"
+ remoteURI));
             connector.setDynamicOnly(dynamicOnly);
             connector.setNetworkTTL(networkTTL);
+            connector.setConduitSubscriptions(conduit);
             localBroker.addNetworkConnector(connector);
             maxSetupTime = 2000;
             return connector;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
Wed Feb 18 16:24:56 2009
@@ -136,7 +136,7 @@
 
 
     @Override
-    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL) throws Exception {
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
         List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
         URI remoteURI;
         if (!transportConnectors.isEmpty()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Wed Feb 18 16:24:56 2009
@@ -94,8 +94,8 @@
      */
     public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
     	// Setup broker networks
-        bridgeBrokers("BrokerB", "BrokerA");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
+        bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
 
         startAllBrokers();
 
@@ -135,8 +135,8 @@
      */
     public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
     	// Setup broker networks
-        bridgeBrokers("BrokerB", "BrokerA");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
+        bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
 
         startAllBrokers();
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
Wed Feb 18 16:24:56 2009
@@ -109,7 +109,7 @@
     }
 
     protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName,
boolean dynamicOnly, int networkTTL) throws Exception {
-        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName,
dynamicOnly, networkTTL);
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName,
dynamicOnly, networkTTL, true);
         connector.setBridgeTempDestinations(enableTempDestinationBridging);
         return connector;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Wed Feb 18 16:24:56 2009
@@ -17,11 +17,15 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.HashMap;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 
+import junit.framework.Test;
+
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.transport.failover.FailoverUriTest;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -29,6 +33,7 @@
  */
 public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
     protected static final int MESSAGE_COUNT = 100;
+    public boolean dynamicOnly;
 
     /**
      * BrokerA -> BrokerB -> BrokerC
@@ -68,6 +73,52 @@
         assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
         assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
     }
+    
+    public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
+    	addCombinationValues("dynamicOnly", new Object[] {true, false});
+    }
+    
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    public void testABandBCbrokerNetworkWithSelectors() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, 2, true);
+        bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, 2, true);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerC", dest, "dummy = 33");
+        MessageConsumer clientB = createConsumer("BrokerC", dest, "dummy > 30");
+        MessageConsumer clientC = createConsumer("BrokerC", dest, "dummy = 34");
+
+        // let consumers propogate around the network
+        Thread.sleep(2000);
+        // Send messages
+        // Send messages for broker A
+        HashMap<String, Object> props = new HashMap<String, Object>();
+        props.put("dummy", 33);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT, props);
+        props.put("dummy", 34);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT * 2, props);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerC", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerC", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2) ;
+
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT *2, msgsC.getMessageCount());
+    }
 
     /**
      * BrokerA <- BrokerB -> BrokerC
@@ -237,4 +288,8 @@
         createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
         createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
     }
+    
+    public static Test suite() {
+    	return suite(ThreeBrokerTopicNetworkTest.class);
+    }
 }



Mime
View raw message