activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block, additional test and further fix
Date Wed, 30 Sep 2015 10:41:48 GMT
https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block,
additional test and further fix


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8514e381
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8514e381
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8514e381

Branch: refs/heads/master
Commit: 8514e38135cf3c4da913806f3677a89785613e10
Parents: 94b5697
Author: gtully <gary.tully@gmail.com>
Authored: Tue Sep 29 16:34:36 2015 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Sep 30 11:41:08 2015 +0100

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.java   |  8 +++-
 .../transport/failover/FailoverTimeoutTest.java | 48 ++++++++++++++++----
 2 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8514e381/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 4e196b3..0f36d67 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -607,7 +607,7 @@ public class FailoverTransport implements CompositeTransport {
                         long start = System.currentTimeMillis();
                         boolean timedout = false;
                         while (transport == null && !disposed && connectionFailure
== null
-                                && !Thread.currentThread().isInterrupted()) {
+                                && !Thread.currentThread().isInterrupted() &&
willReconnect()) {
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("Waiting for transport to reconnect..: " + command);
                             }
@@ -639,6 +639,8 @@ public class FailoverTransport implements CompositeTransport {
                                 error = connectionFailure;
                             } else if (timedout == true) {
                                 error = new IOException("Failover timeout of " + timeout
+ " ms reached.");
+                            } else if (!willReconnect()) {
+                                error = new IOException("Reconnect attempts of " + maxReconnectAttempts
+ " exceeded");
                             } else {
                                 error = new IOException("Unexpected failure.");
                             }
@@ -723,6 +725,10 @@ public class FailoverTransport implements CompositeTransport {
         }
     }
 
+    private boolean willReconnect() {
+        return firstConnection || 0 != calculateReconnectAttemptLimit();
+    }
+
     @Override
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback)
throws IOException {
         throw new AssertionError("Unsupported Method");

http://git-wip-us.apache.org/repos/asf/activemq/blob/8514e381/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index 35a970f..7c36840 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
@@ -37,7 +38,10 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -97,6 +101,14 @@ public class FailoverTimeoutTest {
         LOG.info("Time spent waiting to connect: {} ms", duration);
 
         assertTrue(duration > 3000);
+
+        safeClose(connection);
+    }
+
+    private void safeClose(Connection connection) {
+        try {
+            connection.close();
+        } catch (Exception ignored) {}
     }
 
     @Test
@@ -131,10 +143,29 @@ public class FailoverTimeoutTest {
     }
 
     @Test
-    public void testInterleaveSendAndException() throws Exception {
+    public void testInterleaveAckAndException() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri
+ ")?maxReconnectAttempts=0");
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+        doTestInterleaveAndException(connection, new MessageAck());
+        safeClose(connection);
+    }
 
+    @Test
+    public void testInterleaveTxAndException() throws Exception {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri
+ ")?maxReconnectAttempts=0");
         final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connection.getConnectionInfo().getConnectionId());
+        tx.setTransactionId(new LocalTransactionId(tx.getConnectionId(), 1));
+        doTestInterleaveAndException(connection, tx);
+
+        safeClose(connection);
+    }
+
+    public void doTestInterleaveAndException(final ActiveMQConnection connection, final Command
command) throws Exception {
+
         connection.start();
 
         connection.setExceptionListener(new ExceptionListener() {
@@ -143,7 +174,7 @@ public class FailoverTimeoutTest {
                 try {
                     LOG.info("Deal with exception - invoke op that may block pending outstanding
oneway");
                     // try and invoke on connection as part of handling exception
-                    connection.asyncSendPacket(new MessageAck());
+                    connection.asyncSendPacket(command);
                 } catch (Exception e) {
                 }
             }
@@ -154,24 +185,24 @@ public class FailoverTimeoutTest {
         final int NUM_TASKS = 200;
         final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
 
+        // let a few tasks delay a bit
+        final AtomicLong sleepMillis = new AtomicLong(1000);
         for (int i=0; i < NUM_TASKS; i++) {
-
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        connection.asyncSendPacket(new MessageAck());
-                    } catch (JMSException e) {
-                        e.printStackTrace();
+                        TimeUnit.MILLISECONDS.sleep(Math.max(0, sleepMillis.addAndGet(-50)));
+                        connection.asyncSendPacket(command);
+                    } catch (Exception e) {
                     } finally {
                         enqueueOnExecutorDone.countDown();
                     }
-
                 }
             });
         }
 
-        while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
+        while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 10)) {
             enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
         }
 
@@ -184,6 +215,7 @@ public class FailoverTimeoutTest {
         assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS));
     }
 
+
     @Test
     public void testUpdateUris() throws Exception {
 


Mime
View raw message