activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4785 - fix and test
Date Fri, 04 Oct 2013 10:35:37 GMT
Updated Branches:
  refs/heads/trunk 8572bada7 -> 4f19f31a3


https://issues.apache.org/jira/browse/AMQ-4785 - fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4f19f31a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4f19f31a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4f19f31a

Branch: refs/heads/trunk
Commit: 4f19f31a37ca719a92967272cd751ce0c3a539b4
Parents: 8572bad
Author: gtully <gary.tully@gmail.com>
Authored: Fri Oct 4 11:34:53 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Oct 4 11:35:23 2013 +0100

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.java   | 10 +--
 .../failover/InitalReconnectDelayTest.java      | 87 +++++++++++++++++---
 2 files changed, 79 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4f19f31a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index df669bf..fc81f94 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -257,12 +257,12 @@ public class FailoverTransport implements CompositeTransport {
                 connected = false;
                 connectedToPriority = false;
 
-                // notify before any reconnect attempt so ack state can be whacked
-                if (transportListener != null) {
-                    transportListener.transportInterupted();
-                }
-
                 if (reconnectOk) {
+                    // notify before any reconnect attempt so ack state can be whacked
+                    if (transportListener != null) {
+                        transportListener.transportInterupted();
+                    }
+
                     updated.remove(failedConnectTransportURI);
                     reconnectTask.wakeup();
                 } else if (!isDisposed()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4f19f31a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index 5e728cb..8fe8c08 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -16,35 +16,41 @@
  */
 package org.apache.activemq.transport.failover;
 
-import static org.junit.Assert.assertTrue;
-
+import java.io.IOException;
 import java.util.Date;
-import java.util.concurrent.CountDownLatch;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.TransportListener;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+import static org.junit.Assert.*;
+
 public class InitalReconnectDelayTest {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
     protected BrokerService broker1;
     protected BrokerService broker2;
-    protected CountDownLatch broker2Started = new CountDownLatch(1);
-    protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000";
 
     @Test
     public void testInitialReconnectDelay() throws Exception {
 
+        String uriString = "failover://(tcp://localhost:" +
+            broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
+            ",tcp://localhost:" +
+            broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+            ")?randomize=false&initialReconnectDelay=15000";
+
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
         Connection connection = connectionFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -72,6 +78,65 @@ public class InitalReconnectDelayTest {
         assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end
- start) > 14000);
     }
 
+    @Test
+    public void testNoSuspendedCallbackOnNoReconnect() throws Exception {
+
+        String uriString = "failover://(tcp://localhost:" +
+            broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
+            ",tcp://localhost:" +
+            broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+            ")?randomize=false&maxReconnectAttempts=0";
+
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
+        final AtomicInteger calls = new AtomicInteger(0);
+        connectionFactory.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+            }
+
+            @Override
+            public void onException(IOException error) {
+                LOG.info("on exception: " + error);
+                calls.set(0x01 | calls.intValue());
+            }
+
+            @Override
+            public void transportInterupted() {
+                LOG.info("on transportInterupted");
+                calls.set(0x02 | calls.intValue());
+            }
+
+            @Override
+            public void transportResumed() {
+                LOG.info("on transportResumed");
+                calls.set(0x04 | calls.intValue());
+            }
+        });
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue("foo");
+        MessageProducer producer = session.createProducer(destination);
+
+        final Message message = session.createTextMessage("TEST");
+        producer.send(message);
+
+        // clear listener state
+        calls.set(0);
+
+        LOG.info("Stopping the Broker1...");
+        broker1.stop();
+
+        LOG.info("Attempting to send... failover should throw on disconnect");
+        try {
+            producer.send(destination, message);
+            fail("Expect IOException to bubble up on send");
+        } catch (javax.jms.IllegalStateException producerClosed) {
+        }
+
+        assertEquals("Only an exception is reported to the listener", 0x1, calls.get());
+    }
+
     @Before
     public void setUp() throws Exception {
 
@@ -82,7 +147,7 @@ public class InitalReconnectDelayTest {
         broker1.setBrokerName("broker1");
         broker1.setDeleteAllMessagesOnStartup(true);
         broker1.setDataDirectory(dataDir);
-        broker1.addConnector("tcp://localhost:62001");
+        broker1.addConnector("tcp://localhost:0");
         broker1.setUseJmx(false);
         broker1.start();
         broker1.waitUntilStarted();
@@ -91,7 +156,7 @@ public class InitalReconnectDelayTest {
         broker2.setBrokerName("broker2");
         broker2.setDataDirectory(dataDir);
         broker2.setUseJmx(false);
-        broker2.addConnector("tcp://localhost:62002");
+        broker2.addConnector("tcp://localhost:0");
         broker2.start();
         broker2.waitUntilStarted();
 
@@ -119,8 +184,4 @@ public class InitalReconnectDelayTest {
         }
     }
 
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory(uriString);
-    }
-
 }


Mime
View raw message