activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r733761 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Date Mon, 12 Jan 2009 14:05:58 GMT
Author: dejanb
Date: Mon Jan 12 06:05:57 2009
New Revision: 733761

URL: http://svn.apache.org/viewvc?rev=733761&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2061

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=733761&r1=733760&r2=733761&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Mon Jan 12 06:05:57 2009
@@ -82,6 +82,7 @@
     private long initialReconnectDelay = 10;
     private long maxReconnectDelay = 1000 * 30;
     private long backOffMultiplier = 2;
+    private long timeout = -1;
     private boolean useExponentialBackOff = true;
     private boolean randomize = true;
     private boolean initialized;
@@ -318,7 +319,15 @@
         this.maxReconnectAttempts = maxReconnectAttempts;
     }
 
-    /**
+    public long getTimeout() {
+		return timeout;
+	}
+
+	public void setTimeout(long timeout) {
+		this.timeout = timeout;
+	}
+
+	/**
      * @return Returns the randomize.
      */
     public boolean isRandomize() {
@@ -380,7 +389,7 @@
         try {
 
             synchronized (reconnectMutex) {
- 
+            	
                 if (isShutdownCommand(command) && connectedTransport.get() == null)
{
                     if(command.isShutdownInfo()) {
                         // Skipping send of ShutdownInfo command when not connected.
@@ -393,19 +402,27 @@
                         myTransportListener.onCommand(response);
                         return;
                     }
-                }                      
+                }
                 // Keep trying until the message is sent.
                 for (int i = 0; !disposed; i++) {
                     try {
 
                         // Wait for transport to be connected.
                         Transport transport = connectedTransport.get();
+                        long start = System.currentTimeMillis();
+                        boolean timedout = false;
                         while (transport == null && !disposed
                                 && connectionFailure == null
                                 && !Thread.currentThread().isInterrupted()) {
                             LOG.trace("Waiting for transport to reconnect.");
+                            long end = System.currentTimeMillis();
+                            if (timeout > 0 && (end - start > timeout)) {
+                            	timedout = true;
+                            	LOG.info("Failover timed out after " + (end - start) + "ms");
+                            	break;
+                            }
                             try {
-                                reconnectMutex.wait(1000);
+                                reconnectMutex.wait(100);
                             } catch (InterruptedException e) {
                                 Thread.currentThread().interrupt();
                                 LOG.debug("Interupted: " + e, e);
@@ -420,7 +437,9 @@
                                 error = new IOException("Transport disposed.");
                             } else if (connectionFailure != null) {
                                 error = connectionFailure;
-                            } else {
+                            } else if (timedout == true) {
+                            	error = new IOException("Failover timeout of " + timeout + "
ms reached.");
+                            }else {
                                 error = new IOException("Unexpected failure.");
                             }
                             break;
@@ -632,7 +651,6 @@
     }
     
    final boolean doReconnect() {
-
         Exception failure = null;
         synchronized (reconnectMutex) {
 
@@ -724,7 +742,7 @@
                     }
                 }
             }
-
+            
             if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts)
{
                 LOG.error("Failed to connect to transport after: " + connectFailures + "
attempt(s)");
                 connectionFailure = failure;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java?rev=733761&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Mon Jan 12 06:05:57 2009
@@ -0,0 +1,58 @@
+package org.apache.activemq.transport.failover;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+
+public class FailoverTimeoutTest extends TestCase {
+	
+	private static final String QUEUE_NAME = "test.failovertimeout";
+
+	public void testTimeout() throws Exception {
+		
+		long timeout = 1000;
+		URI tcpUri = new URI("tcp://localhost:61616");
+		BrokerService bs = new BrokerService();
+		bs.setUseJmx(false);
+		bs.addConnector(tcpUri);
+		bs.start();
+		
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout="
+ timeout);
+		Connection connection = cf.createConnection();
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		MessageProducer producer = session.createProducer(session
+				.createQueue(QUEUE_NAME));
+		TextMessage message = session.createTextMessage("Test message");
+		producer.send(message);
+		
+		bs.stop();
+		
+		try {
+			producer.send(message);
+		} catch (JMSException jmse) {
+			jmse.printStackTrace();
+			assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
+		}
+		
+		bs = new BrokerService();
+		
+		bs.setUseJmx(false);
+		bs.addConnector(tcpUri);
+		bs.start();
+		
+		producer.send(message);
+		
+		bs.stop();
+	}
+	
+}



Mime
View raw message