Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 65123 invoked from network); 20 Jul 2006 05:37:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 20 Jul 2006 05:37:53 -0000 Received: (qmail 42549 invoked by uid 500); 20 Jul 2006 05:37:53 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 42516 invoked by uid 500); 20 Jul 2006 05:37:53 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 42506 invoked by uid 99); 20 Jul 2006 05:37:53 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2006 22:37:53 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2006 22:37:51 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 9EABB1A981A; Wed, 19 Jul 2006 22:37:31 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r423780 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/test: java/org/apache/activemq/network/ resources/org/apache/activemq/network/ Date: Thu, 20 Jul 2006 05:37:30 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060720053731.9EABB1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Wed Jul 19 22:37:29 2006 New Revision: 423780 URL: http://svn.apache.org/viewvc?rev=423780&view=rev Log: Adding some network reconnect tests. These are used to validate the our network connections get re-established after a broker restart. Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java?rev=423780&view=auto ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java (added) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java Wed Jul 19 22:37:29 2006 @@ -0,0 +1,314 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +/** + * These test cases are used to verifiy that network connections get re established in all broker + * restart scenarios. + * + * @author chirino + */ +public class NetworkReconnectTest extends TestCase { + + private BrokerService producerBroker; + private BrokerService consumerBroker; + private ActiveMQConnectionFactory producerConnectionFactory; + private ActiveMQConnectionFactory consumerConnectionFactory; + private Destination destination; + private ArrayList connections = new ArrayList(); + + public void testMultipleProducerBrokerRestarts() throws Exception { + for (int i = 0; i < 10; i++) { + testWithProducerBrokerRestart(); + disposeConsumerConnections(); + } + } + + public void testWithoutRestarts() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithProducerBrokerRestart() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + // Restart the first broker... + stopProducerBroker(); + startProducerBroker(); + + counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + messageId = sendMessage(); + message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithConsumerBrokerRestart() throws Exception { + + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + // Restart the first broker... + stopConsumerBroker(); + waitForConsumerToLeave(counter); + startConsumerBroker(); + + consumer = createConsumer(); + waitForConsumerToArrive(counter); + + messageId = sendMessage(); + message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithConsumerBrokerStartDelay() throws Exception { + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + Thread.sleep(1000*5); + + startProducerBroker(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + + public void testWithProducerBrokerStartDelay() throws Exception { + + startProducerBroker(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + + Thread.sleep(1000*5); + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + protected void setUp() throws Exception { + producerConnectionFactory = createProducerConnectionFactory(); + consumerConnectionFactory = createConsumerConnectionFactory(); + destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE"); + + } + + protected void tearDown() throws Exception { + disposeConsumerConnections(); + try { + stopProducerBroker(); + } catch (Throwable e) { + } + try { + stopConsumerBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = (Connection) iter.next(); + try { connection.close(); } catch (Throwable ignore) {} + } + } + + protected void startProducerBroker() throws Exception { + if( producerBroker==null ) { + producerBroker = createFirstBroker(); + producerBroker.start(); + } + } + + protected void stopProducerBroker() throws Exception { + if( producerBroker!=null ) { + producerBroker.stop(); + producerBroker=null; + } + } + + protected void startConsumerBroker() throws Exception { + if( consumerBroker==null ) { + consumerBroker = createSecondBroker(); + consumerBroker.start(); + } + } + + protected void stopConsumerBroker() throws Exception { + if( consumerBroker!=null ) { + consumerBroker.stop(); + consumerBroker=null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml")); + } + + protected BrokerService createSecondBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml")); + } + + protected ActiveMQConnectionFactory createProducerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker1"); + } + + protected ActiveMQConnectionFactory createConsumerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker2"); + } + + protected String sendMessage() throws JMSException { + Connection connection = null; + try { + connection = producerConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + producer.send(message); + return message.getJMSMessageID(); + } finally { + try { connection.close(); } catch (Throwable ignore) {} + } + } + + protected MessageConsumer createConsumer() throws JMSException { + Connection connection = consumerConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(destination); + } + + protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception { + final AtomicInteger rc = new AtomicInteger(0); + Connection connection = cf.createConnection(); + connections.add(connection); + connection.start(); + + ConsumerEventSource source = new ConsumerEventSource(connection, destination); + source.setConsumerListener(new ConsumerListener(){ + public void onConsumerEvent(ConsumerEvent event) { + rc.set(event.getConsumerCount()); + } + }); + source.start(); + + return rc; + } + + protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException { + for( int i=0; i < 100; i++ ) { + if( consumerCounter.get() > 0 ) { + return; + } + Thread.sleep(50); + } + fail("The consumer did not arrive."); + } + + protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException { + for( int i=0; i < 100; i++ ) { + if( consumerCounter.get() == 0 ) { + return; + } + Thread.sleep(50); + } + fail("The consumer did not leave."); + } + +} Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java?rev=423780&view=auto ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java (added) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java Wed Jul 19 22:37:29 2006 @@ -0,0 +1,90 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; + + +/** + * Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels + * fool the TCP transport into thinking that they are initially connected. + * + * @author chirino + */ +public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest { + + ArrayList processes = new ArrayList(); + + + protected BrokerService createFirstBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml")); + } + + protected BrokerService createSecondBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml")); + } + + protected void setUp() throws Exception { + startProcess("ssh -Nn -L60006:localhost:61616 localhost"); + startProcess("ssh -Nn -L60007:localhost:61617 localhost"); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + for (Iterator iter = processes.iterator(); iter.hasNext();) { + Process p = (Process) iter.next(); + p.destroy(); + } + } + + private void startProcess(String command) throws IOException { + final Process process = Runtime.getRuntime().exec(command); + processes.add(process); + new Thread("stdout: "+command){ + public void run() { + try { + InputStream is = process.getInputStream(); + int c; + while((c=is.read())>=0) { + System.out.write(c); + } + } catch (IOException e) { + } + } + }.start(); + new Thread("stderr: "+command){ + public void run() { + try { + InputStream is = process.getErrorStream(); + int c; + while((c=is.read())>=0) { + System.err.write(c); + } + } catch (IOException e) { + } + } + }.start(); + } +} Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml?rev=423780&view=auto ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml (added) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml Wed Jul 19 22:37:29 2006 @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml?rev=423780&view=auto ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml (added) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml Wed Jul 19 22:37:29 2006 @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml?rev=423780&view=auto ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml (added) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml Wed Jul 19 22:37:29 2006 @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml?rev=423780&view=auto ============================================================================== --- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml (added) +++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml Wed Jul 19 22:37:29 2006 @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + +