activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r904568 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/bugs/AMQ2183Test.java
Date Fri, 29 Jan 2010 17:39:20 GMT
Author: gtully
Date: Fri Jan 29 17:39:20 2010
New Revision: 904568

URL: http://svn.apache.org/viewvc?rev=904568&view=rev
Log:
resolve timing issue with reconnect in failover transport that can stall reconnect logic -
resulted in intermittent failure of AMQ2183Test. reorg of test make it reproducable when master
is waiting for slave to connect and its disconnects ocurr async so they can clash with inprogress
reconnects - resolve https://issues.apache.org/activemq/browse/AMQ-2588

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=904568&r1=904567&r2=904568&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Fri Jan 29 17:39:20 2010
@@ -195,9 +195,17 @@
 
 
     public final void handleTransportFailure(IOException e) throws InterruptedException {
-        
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(this + " handleTransportFailure: " + e);
+        }
         Transport transport = connectedTransport.getAndSet(null);
-        if( transport!=null ) {
+        if (transport == null) {
+            // sync with possible in progress reconnect
+            synchronized (reconnectMutex) {
+                transport = connectedTransport.getAndSet(null); 
+            }
+        }
+        if (transport != null) {
             
             transport.setTransportListener(disposedListener);
             ServiceSupport.dispose(transport);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java?rev=904568&r1=904567&r2=904568&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java Fri
Jan 29 17:39:20 2010
@@ -17,10 +17,15 @@
 package org.apache.activemq.bugs;
 
 
+import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -29,8 +34,6 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -79,7 +82,9 @@
         t.start();
         Thread.sleep(2000);
         masterUrl = master.getTransportConnectors().get(0).getConnectUri();
-        
+    }
+
+    private void startSlave() throws IOException, Exception, URISyntaxException {
         slave.setBrokerName("Slave");
         slave.deleteAllMessages();
         slave.addConnector("tcp://localhost:0");
@@ -111,8 +116,27 @@
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                 "failover:(" + masterUrl + ")?randomize=false");
     
-        Connection connection = connectionFactory.createConnection();
-        connection.start();
+        final Connection connection = connectionFactory.createConnection();
+        final CountDownLatch startCommenced = new CountDownLatch(1);
+        final CountDownLatch startDone = new CountDownLatch(1);
+        
+        // start will be blocked pending slave connection but should resume after slave started
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                startCommenced.countDown();
+                try {
+                    connection.start();
+                    startDone.countDown();
+                } catch (Exception e) {
+                    exceptions.put(Thread.currentThread(), e);
+                }
+            }});
+        
+       
+        assertTrue("connection.start has commenced", startCommenced.await(10, TimeUnit.SECONDS));
+        startSlave();
+        assertTrue("connection.start done", startDone.await(70, TimeUnit.SECONDS));
+        
         final MessageCounter counterA = new MessageCounter();
         connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA);
        



Mime
View raw message