activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1242748 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/
Date Fri, 10 Feb 2012 11:48:57 GMT
Author: dejanb
Date: Fri Feb 10 11:48:56 2012
New Revision: 1242748

URL: http://svn.apache.org/viewvc?rev=1242748&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3706 - balance randomize with cluster update

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1242748&r1=1242747&r2=1242748&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Fri Feb 10 11:48:56 2012
@@ -39,7 +39,10 @@ import javax.management.ObjectName;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.Random;
 import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.regex.Pattern;
@@ -74,6 +77,8 @@ public class TransportConnector implemen
     private String updateClusterFilter;
     private boolean auditNetworkProducers = false;
 
+    Random rnd = new Random(System.currentTimeMillis());
+
     public TransportConnector() {
     }
 
@@ -401,29 +406,24 @@ public class TransportConnector implemen
     protected ConnectionControl getConnectionControl() {
         boolean rebalance = isRebalanceClusterClients();
         String connectedBrokers = "";
-        String self = "";
         String separator = "";
 
         if (isUpdateClusterClients()) {
-            if (brokerService.getDefaultSocketURIString() != null) {
-                self += brokerService.getDefaultSocketURIString();
-            }
-            if (rebalance == false) {
-                connectedBrokers += self;
-                separator = ",";
-            }
-            if (this.broker.getPeerBrokerInfos() != null) {
-                for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
-                    if (isMatchesClusterFilter(info.getBrokerName())) {
-                        connectedBrokers += separator;
-                        connectedBrokers += info.getBrokerURL();
-                        separator = ",";
-                    }
+            ArrayList<String> uris = new ArrayList<String>();
+            uris.add(brokerService.getDefaultSocketURIString());
+            for (BrokerInfo info: broker.getPeerBrokerInfos()) {
+                if (isMatchesClusterFilter(info.getBrokerName())) {
+                    uris.add(info.getBrokerURL());
                 }
             }
             if (rebalance) {
-                connectedBrokers += separator + self;
+                Collections.shuffle(uris, rnd);
             }
+            for (String uri: uris) {
+                connectedBrokers += separator + uri;
+                separator = ",";
+            }
+            
         }
         ConnectionControl control = new ConnectionControl();
         control.setConnectedBrokers(connectedBrokers);
@@ -437,6 +437,9 @@ public class TransportConnector implemen
             ConnectionControl control = getConnectionControl();
             for (Connection c : this.connections) {
                 c.updateClient(control);
+                if (isRebalanceClusterClients()) {
+                    control = getConnectionControl();
+                }
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1242748&r1=1242747&r2=1242748&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Fri Feb 10 11:48:56 2012
@@ -50,6 +50,7 @@ import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -704,12 +705,16 @@ public class FailoverTransport implement
     }
 
     private List<URI> getConnectList() {
-        ArrayList<URI> l = new ArrayList<URI>(uris);
-        for (URI uri : updated) {
-            if (!l.contains(uri)) {
-                l.add(uri);
-            }
+        if (!updated.isEmpty()) {
+            if (failedConnectTransportURI != null) {
+                boolean removed = updated.remove(failedConnectTransportURI);
+                if (removed) {
+                    updated.add(failedConnectTransportURI);
+                }
+            }            
+            return updated;
         }
+        ArrayList<URI> l = new ArrayList<URI>(uris);
         boolean removed = false;
         if (failedConnectTransportURI != null) {
             removed = l.remove(failedConnectTransportURI);
@@ -1167,7 +1172,7 @@ public class FailoverTransport implement
 
     public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
         if (isUpdateURIsSupported()) {
-            List<URI> copy = new ArrayList<URI>(this.updated);
+            HashSet<URI> copy = new HashSet<URI>(this.updated);
             updated.clear();
             if (updatedURIs != null && updatedURIs.length > 0) {
                 for (URI uri : updatedURIs) {
@@ -1175,7 +1180,7 @@ public class FailoverTransport implement
                         updated.add(uri);
                     }
                 }
-                if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(updated))
{
+                if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new
HashSet(updated))) {
                     buildBackups();
                     synchronized (reconnectMutex) {
                         reconnect(rebalance);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java?rev=1242748&r1=1242747&r2=1242748&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
Fri Feb 10 11:48:56 2012
@@ -50,7 +50,9 @@ public class FailoverClusterTestSupport 
     protected void assertClientsConnectedToTwoBrokers() {
         Set<String> set = new HashSet<String>();
         for (ActiveMQConnection c : connections) {
-            set.add(c.getTransportChannel().getRemoteAddress());
+            if (c.getTransportChannel().getRemoteAddress() != null) {
+                set.add(c.getTransportChannel().getRemoteAddress());
+            }
         }
         assertTrue("Only 2 connections should be found: " + set,
                 set.size() == 2);
@@ -59,17 +61,46 @@ public class FailoverClusterTestSupport 
     protected void assertClientsConnectedToThreeBrokers() {
         Set<String> set = new HashSet<String>();
         for (ActiveMQConnection c : connections) {
-            set.add(c.getTransportChannel().getRemoteAddress());
+            if (c.getTransportChannel().getRemoteAddress() != null) {
+                set.add(c.getTransportChannel().getRemoteAddress());
+            }
         }
         assertTrue("Only 3 connections should be found: " + set,
                 set.size() == 3);
     }
-
+    
+    protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
+    	Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+        int total = 0;
+        for (ActiveMQConnection c : connections) {
+        	String key = c.getTransportChannel().getRemoteAddress();
+            if (key != null) {
+                total++;
+                if (clientConnectionCounts.containsKey(key)) {
+                    double count = clientConnectionCounts.get(key);
+                    count += 1.0;
+                    clientConnectionCounts.put(key, count);
+                } else {
+                    clientConnectionCounts.put(key, 1.0);
+                }
+            }
+        }
+        Set<String> keys = clientConnectionCounts.keySet();
+        for(String key: keys){
+        	double count = (double)clientConnectionCounts.get(key);
+        	double percentage = count / (double)total;
+            logger.info(count + " of " + total + " connections for " + key + " = " + percentage);
+        	assertTrue("Connections distribution expected to be >= than " + minimumPercentage
+                    + ".  Actuall distribution was " + percentage + " for connection " +
key,
+                    percentage >= minimumPercentage);
+        }
+    }
+    
     protected void assertAllConnectedTo(String url) throws Exception {
         for (ActiveMQConnection c : connections) {
             assertEquals(c.getTransportChannel().getRemoteAddress(), url);
         }
-    }    
+    } 
 
     protected void addBroker(String name, BrokerService brokerService) {
         brokers.put(name, brokerService);
@@ -92,6 +123,7 @@ public class FailoverClusterTestSupport 
     protected void destroyBrokerCluster() throws JMSException, Exception {
         for (BrokerService b : brokers.values()) {
             b.stop();
+            b.waitUntilStopped();
         }
         brokers.clear();
     }
@@ -142,10 +174,11 @@ public class FailoverClusterTestSupport 
         createClients(NUMBER_OF_CLIENTS);
     }
     
-    protected void createClients(int num) throws Exception {
+    @SuppressWarnings("unused")
+	protected void createClients(int numOfClients) throws Exception {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 clientUrl);
-        for (int i = 0; i < num; i++) {
+        for (int i = 0; i < numOfClients; i++) {
             ActiveMQConnection c = (ActiveMQConnection) factory
                     .createConnection();
             c.start();
@@ -163,4 +196,4 @@ public class FailoverClusterTestSupport 
     public void setClientUrl(String clientUrl) {
         this.clientUrl = clientUrl;
     }
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java?rev=1242748&r1=1242747&r2=1242748&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
Fri Feb 10 11:48:56 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.failover;
 
+
 /**
  * Complex cluster test that will exercise the dynamic failover capabilities of
  * a network of brokers. Using a networking of 3 brokers where the 3rd broker is
@@ -37,9 +38,14 @@ public class FailoverComplexClusterTest 
     
     
 
+    /**
+     * Basic dynamic failover 3 broker test 
+     * 
+     * @throws Exception
+     */
     public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
 
-        initSingleTcBroker("", null);
+        initSingleTcBroker("", null, null);
 
         Thread.sleep(2000);
 
@@ -47,36 +53,73 @@ public class FailoverComplexClusterTest 
         createClients();
         Thread.sleep(2000);
 
-        runTests(false);
+        runTests(false, null, null, null);
     }
 
+	/**
+	 * Tests a 3 broker configuration to ensure that the backup is random and
+	 * supported in a cluster. useExponentialBackOff is set to false and
+	 * maxReconnectAttempts is set to 1 to move through the list quickly for
+	 * this test.
+	 * 
+	 * @throws Exception
+	 */
+    public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception
{
 
-    public void testThreeBrokerClusterSingleConnectorBackup() throws Exception {
-
-        initSingleTcBroker("", null);
+        initSingleTcBroker("", null, null);
 
         Thread.sleep(2000);
 
-        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS
+ ")?backup=true&backupPoolSize=2");
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS
+ ")?backup=true&backupPoolSize=2&useExponentialBackOff=false&initialReconnectDelay=500");
         createClients();
         Thread.sleep(2000);
 
-        runTests(false);
+        runTests(false, null, null, null);
     }
 
-
+	/**
+	 * Tests a 3 broker cluster that passes in connection params on the
+	 * transport connector. Prior versions of AMQ passed the TC connection
+	 * params to the client and this should not happen. The chosen param is not
+	 * compatible with the client and will throw an error if used.
+	 * 
+	 * @throws Exception
+	 */
     public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
 
-        initSingleTcBroker("?transport.closeAsync=false", null);
+        initSingleTcBroker("?transport.closeAsync=false", null, null);
 
         Thread.sleep(2000);
         setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS
+ ")");
         createClients();
         Thread.sleep(2000);
 
-        runTests(false);
+        runTests(false, null, null, null);
+    }
+
+
+    /**
+     * Tests a 3 broker cluster using a cluster filter of *
+     * 
+     * @throws Exception
+     */
+    public void testThreeBrokerClusterWithClusterFilter() throws Exception {
+
+        initSingleTcBroker("?transport.closeAsync=false", null, null);
+
+        Thread.sleep(2000);
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS
+ ")");
+        createClients();
+
+        runTests(false, null, "*", null);
     }
 
+	/**
+	 * Test to verify that a broker with multiple transport connections only the
+	 * one marked to update clients is propagate
+	 * 
+	 * @throws Exception
+	 */
     public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
 
         initMultiTcCluster("", null);
@@ -87,11 +130,16 @@ public class FailoverComplexClusterTest 
         createClients();
         Thread.sleep(2000);
 
-        runTests(true);
+        runTests(true, null, null, null);
     }
 
+	/**
+	 * Test to verify the reintroduction of the A Broker
+	 * 
+	 * @throws Exception
+	 */
     public void testOriginalBrokerRestart() throws Exception {
-        initSingleTcBroker("", null);
+        initSingleTcBroker("", null, null);
 
         Thread.sleep(2000);
 
@@ -109,26 +157,95 @@ public class FailoverComplexClusterTest 
 
         assertClientsConnectedToTwoBrokers();
 
-        createBrokerA(false, "", null);
+        createBrokerA(false, null, null, null);
         getBroker(BROKER_A_NAME).waitUntilStarted();
         Thread.sleep(5000);
 
         assertClientsConnectedToThreeBrokers();
     }
 
+	/**
+	 * Test to ensure clients are evenly to all available brokers in the
+	 * network.
+	 * 
+	 * @throws Exception
+	 */
+    public void testThreeBrokerClusterClientDistributions() throws Exception {
+
+        initSingleTcBroker("", null, null);
+
+        Thread.sleep(2000);
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS
+ ")?useExponentialBackOff=false&initialReconnectDelay=500");
+        createClients(100);
+        Thread.sleep(5000);
+
+        runClientDistributionTests(false, null, null, null);
+    }
+
+	/**
+	 * Test to verify that clients are distributed with no less than 20% of the
+	 * clients on any one broker.
+	 * 
+	 * @throws Exception
+	 */
+    public void testThreeBrokerClusterDestinationFilter() throws Exception {
+
+        initSingleTcBroker("", null, null);
+
+        Thread.sleep(2000);
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS
+ ")");
+        createClients();
+
+        runTests(false, null, null, "Queue.TEST.FOO.>");
+    }
+
+
+	/**
+	 * Runs a 3 Broker dynamic failover test: <br/>
+	 * <ul>
+	 * <li>asserts clients are distributed across all 3 brokers</li>
+	 * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
+	 * <li>asserts clients are distributed across all 3 brokers after
+	 * reintroducing the 3rd broker</li>
+	 * </ul>
+	 * 
+	 * @param multi
+	 * @param tcParams
+	 * @param clusterFilter
+	 * @param destinationFilter
+	 * @throws Exception
+	 * @throws InterruptedException
+	 */
+    private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter)
throws Exception, InterruptedException {
+        assertClientsConnectedToThreeBrokers();
+
+        getBroker(BROKER_C_NAME).stop();
+        getBroker(BROKER_C_NAME).waitUntilStopped();
+        removeBroker(BROKER_C_NAME);
+
+        Thread.sleep(5000);
+
+        assertClientsConnectedToTwoBrokers();
+        
+        createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
+        getBroker(BROKER_C_NAME).waitUntilStarted();
+        Thread.sleep(5000);
+
+        assertClientsConnectedToThreeBrokers();
+    }
+    
 
     /**
-     * Runs a 3 tests: <br/>
-     * <ul>
-     * <li>asserts clients are distributed across all 3 brokers</li>
-     * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
-     * <li>asserts clients are distributed across all 3 brokers after reintroducing
the 3rd broker</li>
-     * </ul>
+     * @param multi
+     * @param tcParams
+     * @param clusterFilter
+     * @param destinationFilter
      * @throws Exception
      * @throws InterruptedException
      */
-    private void runTests(boolean multi) throws Exception, InterruptedException {
+    private void runClientDistributionTests(boolean multi, String tcParams, String clusterFilter,
String destinationFilter) throws Exception, InterruptedException {
         assertClientsConnectedToThreeBrokers();
+        assertClientsConnectionsEvenlyDistributed(.25);
 
         getBroker(BROKER_C_NAME).stop();
         getBroker(BROKER_C_NAME).waitUntilStopped();
@@ -137,12 +254,14 @@ public class FailoverComplexClusterTest 
         Thread.sleep(5000);
 
         assertClientsConnectedToTwoBrokers();
+        assertClientsConnectionsEvenlyDistributed(.35);
 
-        createBrokerC(multi, "", null);
+        createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
         getBroker(BROKER_C_NAME).waitUntilStarted();
         Thread.sleep(5000);
 
         assertClientsConnectedToThreeBrokers();
+        assertClientsConnectionsEvenlyDistributed(.20);
     }
 
     @Override
@@ -152,30 +271,31 @@ public class FailoverComplexClusterTest 
     @Override
     protected void tearDown() throws Exception {
         shutdownClients();
-        destroyBrokerCluster();
         Thread.sleep(2000);
+        destroyBrokerCluster();
     }
 
-    private void initSingleTcBroker(String params, String clusterFilter) throws Exception
{
-        createBrokerA(false, params, clusterFilter);
-        createBrokerB(false, params, clusterFilter);
-        createBrokerC(false, params, clusterFilter);
+    private void initSingleTcBroker(String params, String clusterFilter, String destinationFilter)
throws Exception {
+        createBrokerA(false, params, clusterFilter, null);
+        createBrokerB(false, params, clusterFilter, null);
+        createBrokerC(false, params, clusterFilter, null);
         getBroker(BROKER_C_NAME).waitUntilStarted();
     }
 
     private void initMultiTcCluster(String params, String clusterFilter) throws Exception
{
-        createBrokerA(true, params, clusterFilter);
-        createBrokerB(true, params, clusterFilter);
-        createBrokerC(true, params, clusterFilter);
+        createBrokerA(true, params, clusterFilter, null);
+        createBrokerB(true, params, clusterFilter, null);
+        createBrokerC(true, params, clusterFilter, null);
         getBroker(BROKER_C_NAME).waitUntilStarted();
     }
     
-    private void createBrokerA(boolean multi, String params, String clusterFilter) throws
Exception {
+    private void createBrokerA(boolean multi, String params, String clusterFilter, String
destinationFilter) throws Exception {
+    	final String tcParams = (params == null)?"":params;
         if (getBroker(BROKER_A_NAME) == null) {
             addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-            addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS
+ params, true);
+            addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS
+ tcParams, true);
             if (multi) {
-                addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS,
false);
+                addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS
+ tcParams, false);
                 addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" +
BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
                 addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" +
BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
             } else {
@@ -186,12 +306,13 @@ public class FailoverComplexClusterTest 
         }
     }
 
-    private void createBrokerB(boolean multi, String params, String clusterFilter) throws
Exception {
+    private void createBrokerB(boolean multi, String params, String clusterFilter, String
destinationFilter) throws Exception {
+    	final String tcParams = (params == null)?"":params;
         if (getBroker(BROKER_B_NAME) == null) {
             addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-            addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS
+ params, true);
+            addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS
+ tcParams, true);
             if (multi) {
-                addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS,
false);
+                addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS
+ tcParams, false);
                 addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" +
BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
                 addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" +
BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
             } else {
@@ -202,12 +323,13 @@ public class FailoverComplexClusterTest 
         }
     }
 
-    private void createBrokerC(boolean multi, String params, String clusterFilter) throws
Exception {
+    private void createBrokerC(boolean multi, String params, String clusterFilter, String
destinationFilter) throws Exception {
+    	final String tcParams = (params == null)?"":params;
         if (getBroker(BROKER_C_NAME) == null) {
             addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
-            addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS
+ params, true);
+            addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS
+ tcParams, true);
             if (multi) {
-                addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS,
false);
+                addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS
+ tcParams, false);
                 addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" +
BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
                 addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" +
BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
             } else {



Mime
View raw message