Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 31561 invoked from network); 9 Jan 2006 09:40:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 9 Jan 2006 09:40:02 -0000 Received: (qmail 45480 invoked by uid 500); 9 Jan 2006 09:40:01 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 45430 invoked by uid 500); 9 Jan 2006 09:40:01 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 45401 invoked by uid 99); 9 Jan 2006 09:40:00 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2006 01:40:00 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 09 Jan 2006 01:39:59 -0800 Received: (qmail 31228 invoked by uid 65534); 9 Jan 2006 09:39:38 -0000 Message-ID: <20060109093938.31216.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r367253 - in /incubator/activemq/trunk: activemq-core/src/test/java/org/apache/activemq/ assembly/src/test/java/org/apache/activemq/usecases/ Date: Mon, 09 Jan 2006 09:39:33 -0000 To: activemq-commits@geronimo.apache.org From: aco@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: aco Date: Mon Jan 9 01:39:12 2006 New Revision: 367253 URL: http://svn.apache.org/viewcvs?rev=367253&view=rev Log: - Added test cases for multiple brokers Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsUsingTcpTest.java incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=367253&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Mon Jan 9 01:39:12 2006 @@ -0,0 +1,367 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.*; + +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ConnectionClosedException; +import org.springframework.core.io.Resource; + +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Collections; +import java.util.Arrays; +import java.util.Collection; +import java.net.URI; + +/** + * Test case support that allows the easy management and connection of several brokers. + * + * @version $Revision$ + */ +public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { + public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0"; + public static int MAX_SETUP_TIME = 5000; + + protected Map brokers; + protected Map destinations; + + protected int messageSize = 1; + + protected boolean verbose = false; + + protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { + BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker; + BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker; + + bridgeBrokers(localBroker, remoteBroker); + } + + // Overwrite this method to specify how you want to bridge the two brokers + // By default, bridge them using add network connector of the local broker and the first connector of the remote broker + protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI; + if (!transportConnectors.isEmpty()) { + remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri(); + localBroker.addNetworkConnector("static:" + remoteURI); + } else { + throw new Exception("Remote broker has no registered connectors."); + } + + MAX_SETUP_TIME = 2000; + } + + // This will interconnect all brokes using multicast + protected void bridgeAllBrokers() throws Exception { + bridgeAllBrokers("default"); + } + + protected void bridgeAllBrokers(String groupName) throws Exception { + Collection brokerList = brokers.values(); + for (Iterator i=brokerList.iterator(); i.hasNext();) { + BrokerService broker = ((BrokerItem)i.next()).broker; + List transportConnectors = broker.getTransportConnectors(); + + if (transportConnectors.isEmpty()) { + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + transportConnectors = broker.getTransportConnectors(); + } + + TransportConnector transport = (TransportConnector)transportConnectors.get(0); + transport.setDiscoveryUri(new URI("multicast://" + groupName)); + broker.addNetworkConnector("multicast://" + groupName); + } + + // Multicasting may take longer to setup + MAX_SETUP_TIME = 8000; + } + + protected void startAllBrokers() throws Exception { + Collection brokerList = brokers.values(); + for (Iterator i=brokerList.iterator(); i.hasNext();) { + BrokerService broker = ((BrokerItem)i.next()).broker; + broker.start(); + } + + Thread.sleep(MAX_SETUP_TIME); + } + + protected BrokerService createBroker(String brokerName) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(brokerName); + brokers.put(brokerName, new BrokerItem(broker)); + + return broker; + } + + protected BrokerService createBroker(URI brokerUri) throws Exception { + BrokerService broker = BrokerFactory.createBroker(brokerUri); + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + + return broker; + } + + protected BrokerService createBroker(Resource configFile) throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile); + brokerFactory.afterPropertiesSet(); + + BrokerService broker = brokerFactory.getBroker(); + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + + return broker; + } + + protected Connection createConnection(String brokerName) throws Exception { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createConnection(); + } + return null; + } + + protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createConsumer(dest); + } + return null; + } + + protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createDurableSubscriber(dest, name); + } + return null; + } + + protected MessageIdList getBrokerMessages(String brokerName) { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.getAllMessages(); + } + return null; + } + + protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.getConsumerMessages(consumer); + } + return null; + } + + protected void sendMessages(String brokerName, Destination destination, int count) throws Exception { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + + Connection conn = brokerItem.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = brokerItem.createProducer(destination, sess); + + for (int i = 0; i < count; i++) { + TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i); + producer.send(msg); + } + + producer.close(); + sess.close(); + conn.close(); + brokerItem.connections.remove(conn); + } + + protected TextMessage createTextMessage(Session session, String initText) throws Exception { + TextMessage msg = session.createTextMessage(); + + // Pad message text + if (initText.length() < messageSize) { + char[] data = new char[messageSize - initText.length()]; + Arrays.fill(data, '*'); + String str = new String(data); + msg.setText(initText + str); + + // Do not pad message text + } else { + msg.setText(initText); + } + + return msg; + } + + protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException { + Destination dest; + if (topic) { + dest = new ActiveMQTopic(name); + destinations.put(name, dest); + return (ActiveMQDestination)dest; + } else { + dest = new ActiveMQQueue(name); + destinations.put(name, dest); + return (ActiveMQDestination)dest; + } + } + + protected void setUp() throws Exception { + super.setUp(); + brokers = new HashMap(); + destinations = new HashMap(); + } + + protected void tearDown() throws Exception { + destroyAllBrokers(); + super.tearDown(); + } + + protected void destroyBroker(String brokerName) throws Exception { + BrokerItem brokerItem = (BrokerItem)brokers.remove(brokerName); + + if (brokerItem != null) { + brokerItem.destroy(); + } + } + + protected void destroyAllBrokers() throws Exception { + for (Iterator i=brokers.values().iterator(); i.hasNext();) { + BrokerItem brokerItem = (BrokerItem)i.next(); + brokerItem.destroy(); + } + brokers.clear(); + } + + + // Class to group broker components together + protected class BrokerItem { + public BrokerService broker; + public ActiveMQConnectionFactory factory; + public List connections; + public Map consumers; + public MessageIdList allMessages = new MessageIdList(); + + private IdGenerator id; + + public boolean persistent = false; + + public BrokerItem(BrokerService broker) throws Exception { + this.broker = broker; + + factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + consumers = Collections.synchronizedMap(new HashMap()); + connections = Collections.synchronizedList(new ArrayList()); + allMessages.setVerbose(verbose); + id = new IdGenerator(broker.getBrokerName() + ":"); + } + + public Connection createConnection() throws Exception { + Connection conn = factory.createConnection(); + conn.setClientID(id.generateId()); + + connections.add(conn); + return conn; + } + + public MessageConsumer createConsumer(Destination dest) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return createConsumer(dest, s); + } + + public MessageConsumer createConsumer(Destination dest, Session sess) throws Exception { + MessageConsumer client = sess.createConsumer(dest); + MessageIdList messageIdList = new MessageIdList(); + messageIdList.setParent(allMessages); + client.setMessageListener(messageIdList); + consumers.put(client, messageIdList); + + return client; + } + + public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return createDurableSubscriber(dest, s, name); + } + + public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception { + MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name); + MessageIdList messageIdList = new MessageIdList(); + messageIdList.setParent(allMessages); + client.setMessageListener(messageIdList); + consumers.put(client, messageIdList); + + return client; + } + + public MessageIdList getAllMessages() { + return allMessages; + } + + public MessageIdList getConsumerMessages(MessageConsumer consumer) { + return (MessageIdList)consumers.get(consumer); + } + + public MessageProducer createProducer(Destination dest) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return createProducer(dest, s); + } + + public MessageProducer createProducer(Destination dest, Session sess) throws Exception { + MessageProducer client = sess.createProducer(dest); + client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + return client; + } + + public void destroy() throws Exception { + while (!connections.isEmpty()) { + Connection c = (Connection)connections.remove(0); + try { + c.close(); + } catch (ConnectionClosedException e) { + e.printStackTrace(); + } + } + + broker.stop(); + consumers.clear(); + + broker = null; + connections = null; + consumers = null; + factory = null; + } + } + +} Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java?rev=367253&view=auto ============================================================================== --- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java (added) +++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java Mon Jan 9 01:39:12 2006 @@ -0,0 +1,119 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.JmsMultipleBrokersTestSupport; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import java.util.Map; +import java.util.HashMap; +import java.net.URI; + +/** + * @version $Revision: 1.1.1.1 $ + */ +public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport { + public static final int BROKER_COUNT = 5; // number of brokers to network + public static final int CONSUMER_COUNT = 3; // consumers per broker + public static final int PRODUCER_COUNT = 3; // producers per broker + public static final int MESSAGE_COUNT = 10; // messages per producer + + protected Map consumerMap; + + public void testTopicAllConnected() throws Exception { + bridgeAllBrokers(); + startAllBrokers(); + + + // Setup topic destination + Destination dest = createDestination("TEST.FOO", true); + + // Setup consumers + for (int i=1; i<=BROKER_COUNT; i++) { + for (int j=0; j BrokerB -> BrokerC + */ + public void test_AB_BC_BrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + + // Let's try to wait for any messages. Should be none. + Thread.sleep(1000); + + // Get message count + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + assertEquals(0, msgsC.getMessageCount()); + } + + /** + * BrokerA <- BrokerB -> BrokerC + */ + public void test_BA_BC_BrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerB", dest, 10); + + // Let's try to wait for any messages. + Thread.sleep(1000); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + // Total received should be 10 + assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount()); + } + + /** + * BrokerA -> BrokerB <- BrokerC + */ + public void test_AB_CB_BrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerC", "BrokerB"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer clientB = createConsumer("BrokerB", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Get message count + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + + msgsB.waitForMessagesToArrive(20); + + assertEquals(20, msgsB.getMessageCount()); + } + + /** + * BrokerA <-> BrokerB <-> BrokerC + */ + public void testAllConnectedBrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("BrokerA", "BrokerC"); + bridgeBrokers("BrokerC", "BrokerA"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Let's try to wait for any messages. + Thread.sleep(1000); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); + } + + /** + * BrokerA <-> BrokerB <-> BrokerC + */ + public void testAllConnectedUsingMulticast() throws Exception { + // Setup broker networks + bridgeAllBrokers(); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Let's try to wait for any messages. + Thread.sleep(1000); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); + } + + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); + createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false")); + } +} Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java?rev=367253&view=auto ============================================================================== --- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java (added) +++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java Mon Jan 9 01:39:12 2006 @@ -0,0 +1,60 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.activemq.transport.TransportFactory; + +import java.util.List; +import java.util.ArrayList; +import java.net.URI; + +/** + * @version $Revision: 1.1.1.1 $ + */ +public class ThreeBrokerQueueNetworkUsingTcpTest extends ThreeBrokerQueueNetworkTest { + protected List bridges = new ArrayList(); + + protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception { + List remoteTransports = remoteBroker.getTransportConnectors(); + List localTransports = localBroker.getTransportConnectors(); + + URI remoteURI, localURI; + if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) { + remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri(); + localURI = ((TransportConnector)localTransports.get(0)).getConnectUri(); + + // Ensure that we are connecting using tcp + if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) { + DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI), + TransportFactory.connect(remoteURI)); + bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName()); + bridges.add(bridge); + + bridge.start(); + } else { + throw new Exception("Remote broker or local broker is not using tcp connectors"); + } + } else { + throw new Exception("Remote broker or local broker has no registered connectors."); + } + + MAX_SETUP_TIME = 2000; + } +} Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=367253&view=auto ============================================================================== --- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java (added) +++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java Mon Jan 9 01:39:12 2006 @@ -0,0 +1,226 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.JmsMultipleBrokersTestSupport; + +import javax.jms.MessageConsumer; +import javax.jms.Destination; +import java.net.URI; + +/** + * @version $Revision: 1.1.1.1 $ + */ +public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { + + /** + * BrokerA -> BrokerB -> BrokerC + */ + public void test_AB_BC_BrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", true); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + msgsA.waitForMessagesToArrive(10); + msgsB.waitForMessagesToArrive(20); + msgsC.waitForMessagesToArrive(20); + + assertEquals(10, msgsA.getMessageCount()); + assertEquals(20, msgsB.getMessageCount()); + assertEquals(20, msgsC.getMessageCount()); + } + + /** + * BrokerA <- BrokerB -> BrokerC + */ + public void test_BA_BC_BrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", true); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + msgsA.waitForMessagesToArrive(20); + msgsB.waitForMessagesToArrive(10); + msgsC.waitForMessagesToArrive(20); + + assertEquals(20, msgsA.getMessageCount()); + assertEquals(10, msgsB.getMessageCount()); + assertEquals(20, msgsC.getMessageCount()); + } + + /** + * BrokerA -> BrokerB <- BrokerC + */ + public void test_AB_CB_BrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerC", "BrokerB"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", true); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + msgsA.waitForMessagesToArrive(10); + msgsB.waitForMessagesToArrive(30); + msgsC.waitForMessagesToArrive(10); + + assertEquals(10, msgsA.getMessageCount()); + assertEquals(30, msgsB.getMessageCount()); + assertEquals(10, msgsC.getMessageCount()); + } + + /** + * BrokerA <-> BrokerB <-> BrokerC + */ + public void testAllConnectedBrokerNetwork() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("BrokerA", "BrokerC"); + bridgeBrokers("BrokerC", "BrokerA"); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", true); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + msgsA.waitForMessagesToArrive(30); + msgsB.waitForMessagesToArrive(30); + msgsC.waitForMessagesToArrive(30); + + assertEquals(30, msgsA.getMessageCount()); + assertEquals(30, msgsB.getMessageCount()); + assertEquals(30, msgsC.getMessageCount()); + } + + /** + * BrokerA <-> BrokerB <-> BrokerC + */ + public void testAllConnectedUsingMulticast() throws Exception { + // Setup broker networks + bridgeAllBrokers(); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", true); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + MessageConsumer clientB = createConsumer("BrokerB", dest); + MessageConsumer clientC = createConsumer("BrokerC", dest); + + // Send messages + sendMessages("BrokerA", dest, 10); + sendMessages("BrokerB", dest, 10); + sendMessages("BrokerC", dest, 10); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); + MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + + msgsA.waitForMessagesToArrive(30); + msgsB.waitForMessagesToArrive(30); + msgsC.waitForMessagesToArrive(30); + + assertEquals(30, msgsA.getMessageCount()); + assertEquals(30, msgsB.getMessageCount()); + assertEquals(30, msgsC.getMessageCount()); + } + + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); + createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false")); + } +} Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java?rev=367253&view=auto ============================================================================== --- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java (added) +++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java Mon Jan 9 01:39:12 2006 @@ -0,0 +1,60 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.activemq.transport.TransportFactory; + +import java.util.List; +import java.util.ArrayList; +import java.net.URI; + +/** + * @version $Revision: 1.1.1.1 $ + */ +public class ThreeBrokerTopicNetworkUsingTcpTest extends ThreeBrokerTopicNetworkTest { + protected List bridges = new ArrayList(); + + protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception { + List remoteTransports = remoteBroker.getTransportConnectors(); + List localTransports = localBroker.getTransportConnectors(); + + URI remoteURI, localURI; + if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) { + remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri(); + localURI = ((TransportConnector)localTransports.get(0)).getConnectUri(); + + // Ensure that we are connecting using tcp + if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) { + DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI), + TransportFactory.connect(remoteURI)); + bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName()); + bridges.add(bridge); + + bridge.start(); + } else { + throw new Exception("Remote broker or local broker is not using tcp connectors"); + } + } else { + throw new Exception("Remote broker or local broker has no registered connectors."); + } + + MAX_SETUP_TIME = 2000; + } +}