Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 824B897F7 for ; Wed, 1 Feb 2012 13:12:57 +0000 (UTC) Received: (qmail 35854 invoked by uid 500); 1 Feb 2012 13:12:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 35782 invoked by uid 500); 1 Feb 2012 13:12:56 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 35775 invoked by uid 99); 1 Feb 2012 13:12:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Feb 2012 13:12:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Feb 2012 13:12:54 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 714E7238897D for ; Wed, 1 Feb 2012 13:12:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1239118 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/ Date: Wed, 01 Feb 2012 13:12:34 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120201131234.714E7238897D@eris.apache.org> Author: dejanb Date: Wed Feb 1 13:12:33 2012 New Revision: 1239118 URL: http://svn.apache.org/viewvc?rev=1239118&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3685 - fixing cluster update feature Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1239118&r1=1239117&r2=1239118&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Feb 1 13:12:33 2012 @@ -1358,7 +1358,7 @@ public class BrokerService implements Se public String getDefaultSocketURIString() { if (started.get()) { - if (this.defaultSocketURIString ==null) { + if (this.defaultSocketURIString == null) { for (TransportConnector tc:this.transportConnectors) { String result = null; try { @@ -1367,10 +1367,19 @@ public class BrokerService implements Se LOG.warn("Failed to get the ConnectURI for "+tc,e); } if (result != null) { - this.defaultSocketURIString =result; - break; + // find first publishable uri + if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { + this.defaultSocketURIString = result; + break; + } else { + // or use the first defined + if (this.defaultSocketURIString == null) { + this.defaultSocketURIString = result; + } + } } } + } return this.defaultSocketURIString; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1239118&r1=1239117&r2=1239118&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Feb 1 13:12:33 2012 @@ -16,26 +16,6 @@ */ package org.apache.activemq.broker; -import java.io.EOFException; -import java.io.IOException; -import java.net.SocketException; -import java.net.URI; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.transaction.xa.XAResource; import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; @@ -69,6 +49,26 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import javax.transaction.xa.XAResource; +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketException; +import java.net.URI; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + public class TransportConnection implements Connection, Task, CommandVisitor { private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1239118&r1=1239117&r2=1239118&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed Feb 1 13:12:33 2012 @@ -209,7 +209,7 @@ public class TransportConnector implemen brokerInfo.setBrokerId(broker.getBrokerId()); brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); - brokerInfo.setBrokerURL(getPublishableConnectString(getServer().getConnectURI())); + brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); getServer().setAcceptListener(new TransportAcceptListener() { public void onAccept(final Transport transport) { try { @@ -402,28 +402,29 @@ public class TransportConnector implemen boolean rebalance = isRebalanceClusterClients(); String connectedBrokers = ""; String self = ""; + String separator = ""; if (isUpdateClusterClients()) { if (brokerService.getDefaultSocketURIString() != null) { self += brokerService.getDefaultSocketURIString(); - self += ","; } if (rebalance == false) { connectedBrokers += self; + separator = ","; } if (this.broker.getPeerBrokerInfos() != null) { for (BrokerInfo info : this.broker.getPeerBrokerInfos()) { if (isMatchesClusterFilter(info.getBrokerName())) { + connectedBrokers += separator; connectedBrokers += info.getBrokerURL(); - connectedBrokers += ","; + separator = ","; } } } if (rebalance) { - connectedBrokers += self; + connectedBrokers += separator + self; } } - ConnectionControl control = new ConnectionControl(); control.setConnectedBrokers(connectedBrokers); control.setRebalanceConnection(rebalance); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=1239118&r1=1239117&r2=1239118&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java Wed Feb 1 13:12:33 2012 @@ -18,11 +18,12 @@ package org.apache.activemq.transport.failover; -import java.io.IOException; -import java.net.URI; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; +import java.io.IOException; +import java.net.URI; + class BackupTransport extends DefaultTransportListener{ private final FailoverTransport failoverTransport; private Transport transport; @@ -76,4 +77,9 @@ class BackupTransport extends DefaultTra } return false; } + + @Override + public String toString() { + return "Backup transport: " + uri; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1239118&r1=1239117&r2=1239118&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Feb 1 13:12:33 2012 @@ -16,26 +16,6 @@ */ package org.apache.activemq.transport.failover; -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.InterruptedIOException; -import java.net.InetAddress; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.StringTokenizer; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; @@ -59,6 +39,25 @@ import org.apache.activemq.util.ServiceS import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InterruptedIOException; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; + /** * A Transport that is made reliable by being able to fail over to another * transport when a transport failure is detected. @@ -241,6 +240,7 @@ public class FailoverTransport implement } if (reconnectOk) { + updated.remove(failedConnectTransportURI); reconnectTask.wakeup(); } else { propagateFailureToExceptionListener(e); @@ -670,6 +670,7 @@ public class FailoverTransport implement private List getConnectList() { ArrayList l = new ArrayList(uris); + l.addAll(updated); boolean removed = false; if (failedConnectTransportURI != null) { removed = l.remove(failedConnectTransportURI); @@ -806,7 +807,6 @@ public class FailoverTransport implement if (disposed || connectionFailure != null) { reconnectMutex.notifyAll(); } - if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) { return false; } else { @@ -845,7 +845,12 @@ public class FailoverTransport implement // If we have a backup already waiting lets try it. synchronized (backupMutex) { if (backup && !backups.isEmpty()) { - BackupTransport bt = backups.remove(0); + ArrayList l = new ArrayList(backups); + if (randomize) { + Collections.shuffle(l); + } + BackupTransport bt = l.remove(0); + backups.remove(bt); transport = bt.getTransport(); uri = bt.getUri(); } @@ -1098,26 +1103,18 @@ public class FailoverTransport implement public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { if (isUpdateURIsSupported()) { List copy = new ArrayList(this.updated); - List add = new ArrayList(); + updated.clear(); if (updatedURIs != null && updatedURIs.length > 0) { - Set set = new HashSet(); for (URI uri : updatedURIs) { - if (uri != null) { - set.add(uri); - } - } - for (URI uri : set) { - if (copy.remove(uri) == false) { - add.add(uri); + if (uri != null && !uris.contains(uri)) { + updated.add(uri); } } synchronized (reconnectMutex) { - this.updated.clear(); - this.updated.addAll(add); - for (URI uri : copy) { - this.uris.remove(uri); + if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(updated)) { + buildBackups(); + reconnect(rebalance); } - add(rebalance, add.toArray(new URI[add.size()])); } } } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java?rev=1239118&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java Wed Feb 1 13:12:33 2012 @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.network.NetworkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class FailoverClusterTestSupport extends TestCase { + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + private static final int NUMBER_OF_CLIENTS = 30; + + private String clientUrl; + + private final Map brokers = new HashMap(); + private final List connections = new ArrayList(); + + protected void assertClientsConnectedToTwoBrokers() { + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue("Only 2 connections should be found: " + set, + set.size() == 2); + } + + protected void assertClientsConnectedToThreeBrokers() { + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue("Only 3 connections should be found: " + set, + set.size() == 3); + } + + protected void addBroker(String name, BrokerService brokerService) { + brokers.put(name, brokerService); + } + + protected BrokerService getBroker(String name) { + return brokers.get(name); + } + + protected BrokerService removeBroker(String name) { + return brokers.remove(name); + } + + protected void destroyBrokerCluster() throws JMSException, Exception { + for (BrokerService b : brokers.values()) { + b.stop(); + } + brokers.clear(); + } + + protected void shutdownClients() throws JMSException { + for (Connection c : connections) { + c.close(); + } + } + + protected BrokerService createBroker(String brokerName) throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.setUseJmx(false); + answer.setBrokerName(brokerName); + answer.setUseShutdownHook(false); + return answer; + } + + protected void addTransportConnector(BrokerService brokerService, + String connectorName, String uri, boolean clustered) + throws Exception { + TransportConnector connector = brokerService.addConnector(uri); + connector.setName(connectorName); + if (clustered) { + connector.setRebalanceClusterClients(true); + connector.setUpdateClusterClients(true); + connector.setUpdateClusterClientsOnRemove(true); + } else { + connector.setRebalanceClusterClients(false); + connector.setUpdateClusterClients(false); + connector.setUpdateClusterClientsOnRemove(false); + } + } + + protected void addNetworkBridge(BrokerService answer, String bridgeName, + String uri, boolean duplex, String destinationFilter) + throws Exception { + NetworkConnector network = answer.addNetworkConnector(uri); + network.setName(bridgeName); + network.setDuplex(duplex); + if (destinationFilter != null && !destinationFilter.equals("")) { + network.setDestinationFilter(bridgeName); + } + } + + @SuppressWarnings("unused") + protected void createClients() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + clientUrl); + for (int i = 0; i < NUMBER_OF_CLIENTS; i++) { + ActiveMQConnection c = (ActiveMQConnection) factory + .createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + connections.add(c); + } + } + + public String getClientUrl() { + return clientUrl; + } + + public void setClientUrl(String clientUrl) { + this.clientUrl = clientUrl; + } +} \ No newline at end of file Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java?rev=1239118&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java Wed Feb 1 13:12:33 2012 @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +/** + * Complex cluster test that will exercise the dynamic failover capabilities of + * a network of brokers. Using a networking of 3 brokers where the 3rd broker is + * removed and then added back in it is expected in each test that the number of + * connections on the client should start with 3, then have two after the 3rd + * broker is removed and then show 3 after the 3rd broker is reintroduced. + */ +public class FailoverComplexClusterTest extends FailoverClusterTestSupport { + + private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616"; + private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617"; + private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618"; + private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626"; + private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627"; + private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628"; + private static final String BROKER_A_NAME = "BROKERA"; + private static final String BROKER_B_NAME = "BROKERB"; + private static final String BROKER_C_NAME = "BROKERC"; + + + + public void testThreeBrokerClusterSingleConnectorBasic() throws Exception { + + initSingleTcBroker("", null); + + Thread.sleep(2000); + + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); + createClients(); + Thread.sleep(2000); + + runTests(false); + } + + + public void testThreeBrokerClusterSingleConnectorBackup() throws Exception { + + initSingleTcBroker("", null); + + Thread.sleep(2000); + + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2"); + createClients(); + Thread.sleep(2000); + + runTests(false); + } + + + public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception { + + initSingleTcBroker("?transport.closeAsync=false", null); + + Thread.sleep(2000); + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); + createClients(); + + runTests(false); + } + + public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception { + + initMultiTcCluster("", null); + + Thread.sleep(2000); + + setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); + createClients(); + + runTests(true); + } + + + /** + * Runs a 3 tests:
+ *
    + *
  • asserts clients are distributed across all 3 brokers
  • + *
  • asserts clients are distributed across 2 brokers after removing the 3rd
  • + *
  • asserts clients are distributed across all 3 brokers after reintroducing the 3rd broker
  • + *
+ * @throws Exception + * @throws InterruptedException + */ + private void runTests(boolean multi) throws Exception, InterruptedException { + assertClientsConnectedToThreeBrokers(); + + getBroker(BROKER_C_NAME).stop(); + getBroker(BROKER_C_NAME).waitUntilStopped(); + removeBroker(BROKER_C_NAME); + + Thread.sleep(5000); + + assertClientsConnectedToTwoBrokers(); + + createBrokerC(multi, "", null); + getBroker(BROKER_C_NAME).waitUntilStarted(); + Thread.sleep(5000); + + assertClientsConnectedToThreeBrokers(); + } + + @Override + protected void setUp() throws Exception { + } + + @Override + protected void tearDown() throws Exception { + shutdownClients(); + destroyBrokerCluster(); + Thread.sleep(2000); + } + + private void initSingleTcBroker(String params, String clusterFilter) throws Exception { + createBrokerA(false, params, clusterFilter); + createBrokerB(false, params, clusterFilter); + createBrokerC(false, params, clusterFilter); + getBroker(BROKER_C_NAME).waitUntilStarted(); + } + + private void initMultiTcCluster(String params, String clusterFilter) throws Exception { + createBrokerA(true, params, clusterFilter); + createBrokerB(true, params, clusterFilter); + createBrokerC(true, params, clusterFilter); + getBroker(BROKER_C_NAME).waitUntilStarted(); + } + + private void createBrokerA(boolean multi, String params, String clusterFilter) throws Exception { + if (getBroker(BROKER_A_NAME) == null) { + addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); + addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + params, true); + if (multi) { + addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS, false); + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + } else { + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + } + getBroker(BROKER_A_NAME).start(); + } + } + + private void createBrokerB(boolean multi, String params, String clusterFilter) throws Exception { + if (getBroker(BROKER_B_NAME) == null) { + addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); + addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + params, true); + if (multi) { + addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS, false); + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + } else { + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + } + getBroker(BROKER_B_NAME).start(); + } + } + + private void createBrokerC(boolean multi, String params, String clusterFilter) throws Exception { + if (getBroker(BROKER_C_NAME) == null) { + addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME)); + addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + params, true); + if (multi) { + addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS, false); + addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + } else { + addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); + addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + } + getBroker(BROKER_C_NAME).start(); + } + } + +} Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java?rev=1239118&r1=1239117&r2=1239118&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java Wed Feb 1 13:12:33 2012 @@ -16,9 +16,6 @@ */ package org.apache.activemq.transport.failover; -import java.io.IOException; -import java.net.URI; - import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.MessageAck; @@ -32,6 +29,10 @@ import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + import static org.junit.Assert.*; public class FailoverTransportTest { @@ -99,7 +100,6 @@ public class FailoverTransportTest { // Track a connection tracker.track(connection); - try { this.transport.oneway(new RemoveInfo(new ConnectionId("1"))); } catch(Exception e) { @@ -128,7 +128,7 @@ public class FailoverTransportTest { protected Transport createTransport() throws Exception { Transport transport = TransportFactory.connect( - new URI("failover://(tcp://doesNotExist:1234)")); + new URI("failover://(tcp://localhost:1234)")); transport.setTransportListener(new TransportListener() { public void onCommand(Object command) {