activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r923272 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/transport/failover/ReconnectTest.java
Date Mon, 15 Mar 2010 14:49:18 GMT
Author: rajdavies
Date: Mon Mar 15 14:49:18 2010
New Revision: 923272

URL: http://svn.apache.org/viewvc?rev=923272&view=rev
Log:
Fix failing ReconnectTest

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=923272&r1=923271&r2=923272&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Mon Mar 15 14:49:18 2010
@@ -104,9 +104,11 @@ public class FailoverTransport implement
     private int maxCacheSize = 128 * 1024;
     private final TransportListener disposedListener = new DefaultTransportListener() {
     };
-    private boolean connectionInterruptProcessingComplete;
+    //private boolean connectionInterruptProcessingComplete;
 
     private final TransportListener myTransportListener = createTransportListener();
+    private boolean updateURIsSupported=true;
+    private boolean reconnectSupported=true;
 
     public FailoverTransport() throws InterruptedIOException {
 
@@ -951,36 +953,46 @@ public class FailoverTransport implement
     }
 
     public boolean isReconnectSupported() {
-        return true;
+        return this.reconnectSupported;
     }
-
+    
+    public void setReconnectSupported(boolean value) {
+        this.reconnectSupported=value;
+    }
+   
     public boolean isUpdateURIsSupported() {
-        return true;
+        return this.updateURIsSupported;
+    }
+    
+    public void setUpdateURIsSupported(boolean value) {
+        this.updateURIsSupported=value;
     }
 
     public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
-        List<URI> copy = new ArrayList<URI>(this.updated);
-        List<URI> add = new ArrayList<URI>();
-        if (updatedURIs != null && updatedURIs.length > 0) {
-            Set<URI> set = new HashSet<URI>();
-            for (int i = 0; i < updatedURIs.length; i++) {
-                URI uri = updatedURIs[i];
-                if (uri != null) {
-                    set.add(uri);
-                }
-            }
-            for (URI uri : set) {
-                if (copy.remove(uri) == false) {
-                    add.add(uri);
-                }
-            }
-            synchronized (reconnectMutex) {
-                this.updated.clear();
-                this.updated.addAll(add);
-                for (URI uri : copy) {
-                    this.uris.remove(uri);
+        if (isUpdateURIsSupported()) {
+            List<URI> copy = new ArrayList<URI>(this.updated);
+            List<URI> add = new ArrayList<URI>();
+            if (updatedURIs != null && updatedURIs.length > 0) {
+                Set<URI> set = new HashSet<URI>();
+                for (int i = 0; i < updatedURIs.length; i++) {
+                    URI uri = updatedURIs[i];
+                    if (uri != null) {
+                        set.add(uri);
+                    }
+                }
+                for (URI uri : set) {
+                    if (copy.remove(uri) == false) {
+                        add.add(uri);
+                    }
+                }
+                synchronized (reconnectMutex) {
+                    this.updated.clear();
+                    this.updated.addAll(add);
+                    for (URI uri : copy) {
+                        this.uris.remove(uri);
+                    }
+                    add(rebalance, add.toArray(new URI[add.size()]));
                 }
-                add(rebalance, add.toArray(new URI[add.size()]));
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java?rev=923272&r1=923271&r2=923272&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
Mon Mar 15 14:49:18 2010
@@ -23,15 +23,12 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
 import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -55,8 +52,8 @@ public class ReconnectTest extends TestC
 
     private BrokerService bs;
     private URI tcpUri;
-    private AtomicInteger resumedCount = new AtomicInteger();
-    private AtomicInteger interruptedCount = new AtomicInteger();
+    private final AtomicInteger resumedCount = new AtomicInteger();
+    private final AtomicInteger interruptedCount = new AtomicInteger();
     private Worker[] workers;
 
     class Worker implements Runnable {
@@ -64,14 +61,14 @@ public class ReconnectTest extends TestC
         public AtomicInteger iterations = new AtomicInteger();
         public CountDownLatch stopped = new CountDownLatch(1);
 
-        private ActiveMQConnection connection;
-        private AtomicBoolean stop = new AtomicBoolean(false);
+        private final ActiveMQConnection connection;
+        private final AtomicBoolean stop = new AtomicBoolean(false);
         private Throwable error;
-        private String name;
+        private final String name;
 
         public Worker(final String name) throws URISyntaxException, JMSException {
             this.name=name;
-            URI uri = new URI("failover://(mock://(" + tcpUri + "))");
+            URI uri = new URI("failover://(mock://(" + tcpUri + "))?updateURIsSupported=false");
             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
             connection = (ActiveMQConnection)factory.createConnection();
             connection.addTransportListener(new TransportListener() {
@@ -96,7 +93,7 @@ public class ReconnectTest extends TestC
         }
 
         public void failConnection() {
-            MockTransport mockTransport = (MockTransport)connection.getTransportChannel().narrow(MockTransport.class);
+            MockTransport mockTransport = connection.getTransportChannel().narrow(MockTransport.class);
             mockTransport.onException(new IOException("Simulated error"));
         }
 
@@ -222,6 +219,7 @@ public class ReconnectTest extends TestC
 
     }
 
+    @Override
     protected void setUp() throws Exception {
         bs = new BrokerService();
         bs.setPersistent(false);
@@ -238,6 +236,7 @@ public class ReconnectTest extends TestC
 
     }
 
+    @Override
     protected void tearDown() throws Exception {
         for (int i = 0; i < WORKER_COUNT; i++) {
             workers[i].stop();



Mime
View raw message