activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r636106 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/transport/failover/ReconnectTest.java
Date Tue, 11 Mar 2008 21:17:57 GMT
Author: chirino
Date: Tue Mar 11 14:17:51 2008
New Revision: 636106

URL: http://svn.apache.org/viewvc?rev=636106&view=rev
Log:
Made the ReconnectTest a little more robust.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=636106&r1=636105&r2=636106&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Tue Mar 11 14:17:51 2008
@@ -194,15 +194,13 @@
 
 
     public final void handleTransportFailure(IOException e) throws InterruptedException {
-        if (transportListener != null) {
-            transportListener.transportInterupted();
-        }
         
         Transport transport = connectedTransport.get();
         if( transport!=null ) {
             ServiceSupport.dispose(transport);
         }
         
+        boolean wasConnected=false;            
         synchronized (reconnectMutex) {
             boolean reconnectOk = false;
             if(started) {
@@ -212,6 +210,7 @@
             }
             
             if (connectedTransport.get() != null) {
+                wasConnected=true;
                 initialized = false;
                 failedConnectTransportURI=connectedTransportURI;
                 connectedTransport.set(null);
@@ -223,6 +222,12 @@
             	reconnectTask.wakeup();
             }
         }
+
+        // Avoid double firing a transportInterupted() event due to an extra IOException
+        if (transportListener != null && wasConnected) {
+            transportListener.transportInterupted();
+        }
+
     }
 
     public void start() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java?rev=636106&r1=636105&r2=636106&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
Tue Mar 11 14:17:51 2008
@@ -25,13 +25,13 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -55,10 +55,11 @@
 
     private BrokerService bs;
     private URI tcpUri;
+    private AtomicInteger resumedCount = new AtomicInteger();
     private AtomicInteger interruptedCount = new AtomicInteger();
     private Worker[] workers;
 
-    class Worker implements Runnable, ExceptionListener {
+    class Worker implements Runnable {
 
         public AtomicInteger iterations = new AtomicInteger();
         public CountDownLatch stopped = new CountDownLatch(1);
@@ -68,12 +69,11 @@
         private Throwable error;
         private String name;
 
-        public Worker(String name) throws URISyntaxException, JMSException {
+        public Worker(final String name) throws URISyntaxException, JMSException {
             this.name=name;
             URI uri = new URI("failover://(mock://(" + tcpUri + "))");
             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
             connection = (ActiveMQConnection)factory.createConnection();
-            connection.setExceptionListener(this);
             connection.addTransportListener(new TransportListener() {
                 public void onCommand(Object command) {
                 }
@@ -83,10 +83,13 @@
                 }
 
                 public void transportInterupted() {
+                    LOG.info("Worker "+name+" was interrupted...");
                     interruptedCount.incrementAndGet();
                 }
 
                 public void transportResumed() {
+                    LOG.info("Worker "+name+" was resummed...");
+                    resumedCount.incrementAndGet();
                 }
             });
             connection.start();
@@ -139,11 +142,6 @@
             }
         }
 
-        public void onException(JMSException error) {
-            setError(error);
-            stop();
-        }
-
         public synchronized Throwable getError() {
             return error;
         }
@@ -155,7 +153,7 @@
         public synchronized void assertNoErrors() {
             if (error != null) {
                 error.printStackTrace();
-                fail("Got Exception: " + error);
+                fail("Worker "+name+" got Exception: " + error);
             }
         }
 
@@ -163,18 +161,23 @@
 
     public void testReconnects() throws Exception {
 
-        for (int k = 1; k < 5; k++) {
+        for (int k = 1; k < 10; k++) {
 
             LOG.info("Test run: " + k);
 
             // Wait for at least one iteration to occur...
             for (int i = 0; i < WORKER_COUNT; i++) {
-                for (int j = 0; workers[i].iterations.get() == 0 && j < 5; j++)
{
+               int c=0;
+                for (int j = 0; j < 30; j++) {
+                       c = workers[i].iterations.getAndSet(0);
+                       if( c != 0 ) {
+                               break;
+                       }
                     workers[i].assertNoErrors();
-                    LOG.info("Waiting for worker " + i + " to finish an iteration.");
+                    LOG.info("Test run "+k+": Waiting for worker " + i + " to finish an iteration.");
                     Thread.sleep(1000);
                 }
-                assertTrue("Worker " + i + " never completed an interation.", workers[i].iterations.get()
!= 0);
+                assertTrue("Test run "+k+": Worker " + i + " never completed an interation.",
c != 0);
                 workers[i].assertNoErrors();
             }
 
@@ -185,21 +188,35 @@
                 workers[i].failConnection();
             }
 
+            long start;
             // Wait for the connections to get interrupted...
+            start = System.currentTimeMillis();
             while (interruptedCount.get() < WORKER_COUNT) {
-                LOG.info("Waiting for connections to get interrupted.. at: " + interruptedCount.get());
+               if( System.currentTimeMillis()-start > 1000*60 ) {
+                      fail("Timed out waiting for all connections to be interrupted.");
+               }
+                LOG.info("Test run "+k+": Waiting for connections to get interrupted.. at:
" + interruptedCount.get());
                 Thread.sleep(1000);
             }
 
-            // let things stablize..
-            LOG.info("Pausing before starting next iterations...");
-            Thread.sleep(1000);
+            // Wait for the connections to re-establish...
+            start = System.currentTimeMillis();
+            while (resumedCount.get() < WORKER_COUNT) {
+               if( System.currentTimeMillis()-start > 1000*60 ) {
+                       fail("Timed out waiting for all connections to be resumed.");
+               }
+                LOG.info("Test run "+k+": Waiting for connections to get resumed.. at: "
+ resumedCount.get());
+                Thread.sleep(1000);
+            }
 
             // Reset the counters..
             interruptedCount.set(0);
+            resumedCount.set(0);
             for (int i = 0; i < WORKER_COUNT; i++) {
                 workers[i].iterations.set(0);
             }
+            
+            Thread.sleep(1000);
 
         }
 



Mime
View raw message