Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 93346 invoked from network); 13 Jan 2011 14:18:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 13 Jan 2011 14:18:51 -0000 Received: (qmail 46151 invoked by uid 500); 13 Jan 2011 14:18:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 46075 invoked by uid 500); 13 Jan 2011 14:18:49 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 46060 invoked by uid 99); 13 Jan 2011 14:18:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Jan 2011 14:18:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Jan 2011 14:18:43 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8738A23888FD; Thu, 13 Jan 2011 14:18:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1058577 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ main/java/org/apache/activemq/... Date: Thu, 13 Jan 2011 14:18:15 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110113141815.8738A23888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Thu Jan 13 14:18:14 2011 New Revision: 1058577 URL: http://svn.apache.org/viewvc?rev=1058577&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3077 - ArraysIndexOutOfBoundsException : -32768 in "BrokerService[xxx] Task" thread - brokerInfo and peerBroker infro explosion problems. A peer is a oneway relationship with networks, broker infos were being accumulated in duplicate for each connector and for multiple connectors. The peer broker info was maintained for each which caused the problem marshalling. re: https://issues.apache.org/jira/browse/AMQ-2632 - the configuration is now respected so it can be selectively enabled and rebalance only occurs if we randomly choose an alternative. The nested peer broker info is not propagated in a connection control Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jan 13 14:18:14 2011 @@ -1015,9 +1015,6 @@ public class TransportConnection impleme ignore.printStackTrace(); } } - if (brokerInfo != null) { - broker.removeBroker(this, brokerInfo); - } } LOG.debug("Connection Stopped: " + getRemoteAddress()); } @@ -1182,7 +1179,7 @@ public class TransportConnection impleme // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { - // We first look if existing network connection already exists for the same broker Id + // We first look if existing network connection already exists for the same broker Id and network connector name // It's possible in case of brief network fault to have this transport connector side of the connection always active // and the duplex network connector side wanting to open a new one // In this case, the old connection must be broken @@ -1234,7 +1231,6 @@ public class TransportConnection impleme LOG.warn("Unexpected extra broker info command received: " + info); } this.brokerInfo = info; - broker.addBroker(this, info); networkConnection = true; List connectionStates = listConnectionStates(); for (TransportConnectionState cs : connectionStates) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Jan 13 14:18:14 2011 @@ -94,7 +94,7 @@ public class RegionBroker extends EmptyB private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); private final Map destinations = new ConcurrentHashMap(); - private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList(); + private final Map brokerInfos = new HashMap(); private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); private BrokerId brokerId; @@ -640,14 +640,25 @@ public class RegionBroker extends EmptyB @Override public synchronized void addBroker(Connection connection, BrokerInfo info) { - brokerInfos.add(info); + BrokerInfo existing = brokerInfos.get(info.getBrokerId()); + if (existing == null) { + existing = info.copy(); + existing.setPeerBrokerInfos(null); + brokerInfos.put(info.getBrokerId(), existing); + } + existing.incrementRefCount(); + LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); addBrokerInClusterUpdate(); } @Override public synchronized void removeBroker(Connection connection, BrokerInfo info) { if (info != null) { - brokerInfos.remove(info); + BrokerInfo existing = brokerInfos.get(info.getBrokerId()); + if (existing != null && existing.decrementRefCount() == 0) { + brokerInfos.remove(info.getBrokerId()); + } + LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); removeBrokerInClusterUpdate(); } } @@ -655,7 +666,7 @@ public class RegionBroker extends EmptyB @Override public synchronized BrokerInfo[] getPeerBrokerInfos() { BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; - result = brokerInfos.toArray(result); + result = brokerInfos.values().toArray(result); return result; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Thu Jan 13 14:18:14 2011 @@ -45,6 +45,7 @@ public class BrokerInfo extends BaseComm long connectionId; String brokerUploadUrl; String networkProperties; + transient int refCount = 0; public BrokerInfo copy() { BrokerInfo copy = new BrokerInfo(); @@ -265,4 +266,15 @@ public class BrokerInfo extends BaseComm } return result; } + + public int getRefCount() { + return refCount; + } + + public void incrementRefCount() { + refCount++; + } + public int decrementRefCount() { + return --refCount; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Jan 13 14:18:14 2011 @@ -228,20 +228,12 @@ public abstract class DemandForwardingBr localBroker.start(); remoteBroker.start(); - if (configuration.isDuplex() && duplexInitiatingConnection == null) { - // initiator side of duplex network - remoteBrokerNameKnownLatch.await(); - } if (!disposed.get()) { try { triggerRemoteStartBridge(); } catch (IOException e) { LOG.warn("Caught exception from remote start", e); } - NetworkBridgeListener l = this.networkBridgeListener; - if (l != null) { - l.onStart(this); - } } else { LOG.warn ("Bridge was disposed before the start() method was fully executed."); throw new TransportDisposedIOException(); @@ -309,6 +301,10 @@ public abstract class DemandForwardingBr localSessionInfo = new SessionInfo(localConnectionInfo, 1); localBroker.oneway(localSessionInfo); brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex); + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null) { + l.onStart(this); + } LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); } else { @@ -419,6 +415,7 @@ public abstract class DemandForwardingBr ss.throwFirstException(); } } + brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); remoteBrokerNameKnownLatch.countDown(); @@ -480,6 +477,8 @@ public abstract class DemandForwardingBr serviceRemoteBrokerInfo(command); // Let the local broker know the remote broker's ID. localBroker.oneway(command); + // new peer broker (a consumer can work with remote broker also) + brokerService.getBroker().addBroker(null, remoteBrokerInfo); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jan 13 14:18:14 2011 @@ -120,6 +120,7 @@ public class FailoverTransport implement private SslContext brokerSslContext; private String updateURIsURL = null; private boolean rebalanceUpdateURIs=true; + private boolean doRebalance = false; public FailoverTransport() throws InterruptedIOException { brokerSslContext = SslContext.getCurrentSslContext(); @@ -131,7 +132,7 @@ public class FailoverTransport implement boolean buildBackup = true; boolean doReconnect = !disposed; synchronized (backupMutex) { - if (connectedTransport.get() == null && !disposed) { + if ((connectedTransport.get() == null || doRebalance) && !disposed) { result = doReconnect(); buildBackup = false; } @@ -623,7 +624,7 @@ public class FailoverTransport implement for (int i = 0; i < u.length; i++) { uris.remove(u[i]); } - reconnect(rebalance); + // rebalance is automatic if any connected to removed/stopped broker } public void add(boolean rebalance, String u) { @@ -643,15 +644,7 @@ public class FailoverTransport implement synchronized (reconnectMutex) { if (started) { if (rebalance) { - Transport transport = this.connectedTransport.getAndSet(null); - if (transport != null) { - try { - transport.stop(); - } catch (Exception e) { - LOG.debug("Caught an exception stopping existing transport", e); - } - } - + doRebalance = true; } LOG.debug("Waking up reconnect task"); try { @@ -683,7 +676,7 @@ public class FailoverTransport implement if (removed) { l.add(failedConnectTransportURI); } - LOG.debug("urlList connectionList:" + l); + LOG.debug("urlList connectionList:" + l + ", from: " + uris); return l; } @@ -798,13 +791,31 @@ public class FailoverTransport implement reconnectMutex.notifyAll(); } - if (connectedTransport.get() != null || disposed || connectionFailure != null) { + if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) { return false; } else { List connectList = getConnectList(); if (connectList.isEmpty()) { failure = new IOException("No uris available to connect to."); } else { + if (doRebalance) { + if (connectList.get(0).equals(connectedTransportURI)) { + // already connected to first in the list, no need to rebalance + doRebalance = false; + return false; + } else { + LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); + try { + Transport transport = this.connectedTransport.getAndSet(null); + if (transport != null) { + transport.stop(); + } + } catch (Exception e) { + LOG.debug("Caught an exception stopping existing transport for rebalance", e); + } + } + doRebalance = false; + } if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { reconnectDelay = initialReconnectDelay; } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Thu Jan 13 14:18:14 2011 @@ -152,19 +152,23 @@ public class JmsMultipleBrokersTestSuppo maxSetupTime = 8000; } - - protected void waitForBridgeFormation() throws Exception { + + protected void waitForBridgeFormation(final int min) throws Exception { for (BrokerItem brokerItem : brokers.values()) { final BrokerService broker = brokerItem.broker; if (!broker.getNetworkConnectors().isEmpty()) { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty(); + return (broker.getNetworkConnectors().get(0).activeBridges().size() >= min); }}); } } } + protected void waitForBridgeFormation() throws Exception { + waitForBridgeFormation(1); + } + protected void startAllBrokers() throws Exception { Collection brokerList = brokers.values(); for (Iterator i = brokerList.iterator(); i.hasNext();) { @@ -517,6 +521,7 @@ public class JmsMultipleBrokersTestSuppo } broker.stop(); + broker.waitUntilStopped(); consumers.clear(); broker = null; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=1058577&r1=1058576&r2=1058577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Thu Jan 13 14:18:14 2011 @@ -35,31 +35,32 @@ import org.apache.activemq.network.Netwo public class FailoverClusterTest extends TestCase { -private static final int NUMBER = 10; -private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; -private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; -private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")"; -private static final String BROKER_A_NAME = "BROKERA"; -private static final String BROKER_B_NAME = "BROKERB"; -private BrokerService brokerA; -private BrokerService brokerB; -private final Listconnections = new ArrayList(); - - - public void testClusterConnectedAfterClients() throws Exception{ - createClients(); - if (brokerB == null) { - brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); - } - Thread.sleep(3000); - Set set = new HashSet(); - for (ActiveMQConnection c:connections) { - set.add(c.getTransportChannel().getRemoteAddress()); - } - assertTrue(set.size() > 1); - } + private static final int NUMBER = 10; + private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; + private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; + private static final String BROKER_A_NAME = "BROKERA"; + private static final String BROKER_B_NAME = "BROKERB"; + private BrokerService brokerA; + private BrokerService brokerB; + private String clientUrl; - public void testClusterURIOptionsStrip() throws Exception{ + private final List connections = new ArrayList(); + + + public void testClusterConnectedAfterClients() throws Exception { + createClients(); + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(3000); + Set set = new HashSet(); + for (ActiveMQConnection c : connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue(set.size() > 1); + } + + public void testClusterURIOptionsStrip() throws Exception { createClients(); if (brokerB == null) { // add in server side only url param, should not be propagated @@ -67,45 +68,44 @@ private final Listco } Thread.sleep(3000); Set set = new HashSet(); - for (ActiveMQConnection c:connections) { + for (ActiveMQConnection c : connections) { set.add(c.getTransportChannel().getRemoteAddress()); } assertTrue(set.size() > 1); } - - public void testClusterConnectedBeforeClients() throws Exception{ - - if (brokerB == null) { - brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); - } - Thread.sleep(5000); - createClients(); - Thread.sleep(2000); - brokerA.stop(); - Thread.sleep(2000); - - URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); - for (ActiveMQConnection c:connections) { - String addr = c.getTransportChannel().getRemoteAddress(); - assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0); - } - } + + public void testClusterConnectedBeforeClients() throws Exception { + + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(5000); + createClients(); + Thread.sleep(2000); + brokerA.stop(); + Thread.sleep(2000); + + URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); + for (ActiveMQConnection c : connections) { + String addr = c.getTransportChannel().getRemoteAddress(); + assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0); + } + } @Override protected void setUp() throws Exception { if (brokerA == null) { - brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false"); + brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false"); + clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")"; } - - } @Override protected void tearDown() throws Exception { - for (Connection c:connections) { + for (Connection c : connections) { c.close(); - } + } if (brokerB != null) { brokerB.stop(); brokerB = null; @@ -115,16 +115,16 @@ private final Listco brokerA = null; } } - + protected BrokerService createBrokerA(String uri) throws Exception { BrokerService answer = new BrokerService(); answer.setUseJmx(false); - configureConsumerBroker(answer,uri); + configureConsumerBroker(answer, uri); answer.start(); return answer; } - - protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception { + + protected void configureConsumerBroker(BrokerService answer, String uri) throws Exception { answer.setBrokerName(BROKER_A_NAME); answer.setPersistent(false); TransportConnector connector = answer.addConnector(uri); @@ -132,33 +132,33 @@ private final Listco connector.setUpdateClusterClients(true); answer.setUseShutdownHook(false); } - + protected BrokerService createBrokerB(String uri) throws Exception { BrokerService answer = new BrokerService(); answer.setUseJmx(false); - configureNetwork(answer,uri); + configureNetwork(answer, uri); answer.start(); return answer; } - - protected void configureNetwork(BrokerService answer,String uri) throws Exception { + + protected void configureNetwork(BrokerService answer, String uri) throws Exception { answer.setBrokerName(BROKER_B_NAME); answer.setPersistent(false); - NetworkConnector network = answer.addNetworkConnector("static://"+BROKER_A_BIND_ADDRESS); + NetworkConnector network = answer.addNetworkConnector("static://" + BROKER_A_BIND_ADDRESS); network.setDuplex(true); - TransportConnector connector =answer.addConnector(uri); + TransportConnector connector = answer.addConnector(uri); connector.setRebalanceClusterClients(true); connector.setUpdateClusterClients(true); answer.setUseShutdownHook(false); } - + protected void createClients() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(CLIENT_URL); - for (int i =0;i < NUMBER; i++) { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); + for (int i = 0; i < NUMBER; i++) { ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); c.start(); Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = s.createQueue(getClass().getName()); + Queue queue = s.createQueue(getClass().getName()); MessageConsumer consumer = s.createConsumer(queue); connections.add(c); } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java?rev=1058577&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java Thu Jan 13 14:18:14 2011 @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.ThreadTracker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class NetworkOfTwentyBrokersTest extends JmsMultipleBrokersTestSupport { + private static final Log LOG = LogFactory.getLog(NetworkOfTwentyBrokersTest.class); + + // This will interconnect all brokers using multicast + protected void bridgeAllBrokers() throws Exception { + bridgeAllBrokers("TwentyBrokersTest", 1, false, false); + } + + protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception { + bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false); + } + + protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception { + Collection brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + List transportConnectors = broker.getTransportConnectors(); + + if (transportConnectors.isEmpty()) { + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + transportConnectors = broker.getTransportConnectors(); + } + + TransportConnector transport = transportConnectors.get(0); + if (transport.getDiscoveryUri() == null) { + transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName)); + } + + List networkConnectors = broker.getNetworkConnectors(); + if (networkConnectors.isEmpty()) { + broker.addNetworkConnector("multicast://default?group=" + groupName); + networkConnectors = broker.getNetworkConnectors(); + } + + NetworkConnector nc = networkConnectors.get(0); + nc.setNetworkTTL(ttl); + nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); + nc.setDecreaseNetworkConsumerPriority(decreasePriority); + } + + // Multicasting may take longer to setup + maxSetupTime = 8000; + } + + protected BrokerService createBroker(String brokerName) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setBrokerName(brokerName); + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + brokers.put(brokerName, new BrokerItem(broker)); + + return broker; + } + + /* AMQ-3077 Bug */ + public void testBrokers() throws Exception { + int X = 20; + int i; + + LOG.info("Creating X Brokers"); + for (i = 0; i < X; i++) { + createBroker("Broker" + i); + } + + bridgeAllBrokers(); + startAllBrokers(); + waitForBridgeFormation(X-1); + + verifyPeerBrokerInfos(X-1); + + LOG.info("Stopping half the brokers"); + for (i = 0; i < X/2; i++) { + destroyBroker("Broker" + i); + } + + LOG.info("Waiting for complete stop"); + try { + Thread.sleep(10000); + } catch (Exception e) { + } + + verifyPeerBrokerInfos((X/2) -1); + + LOG.info("Recreating first half"); + for (i = 0; i < X/2; i++) { + createBroker("Broker" + i); + } + + bridgeAllBrokers(); + startAllBrokers(); + waitForBridgeFormation(X-1); + + verifyPeerBrokerInfos(X-1); + } + + public void testPeerBrokerCountHalfPeer() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 0); + } + + public void testPeerBrokerCountHalfPeerTwice() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + bridgeBrokers("A", "B"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 0); + } + + public void testPeerBrokerCountFullPeer() throws Exception { + createBroker("A"); + createBroker("B"); + bridgeBrokers("A", "B"); + bridgeBrokers("B", "A"); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + } + + public void testPeerBrokerCountFullPeerDuplex() throws Exception { + createBroker("A"); + createBroker("B"); + NetworkConnector nc = bridgeBrokers("A", "B"); + nc.setDuplex(true); + startAllBrokers(); + verifyPeerBrokerInfo(brokers.get("A"), 1); + verifyPeerBrokerInfo(brokers.get("B"), 1); + } + + + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) { + BrokerService broker = brokerItem.broker; + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { + LOG.info(info.getBrokerName()); + } + assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); + } + + private void verifyPeerBrokerInfos(final int max) { + Collection brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + verifyPeerBrokerInfo(i.next(), max); + } + } + + @Override + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadTracker.result(); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date