activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1183062 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/discovery/simple/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/network/ test/...
Date Thu, 13 Oct 2011 20:07:49 GMT
Author: gtully
Date: Thu Oct 13 20:07:48 2011
New Revision: 1183062

URL: http://svn.apache.org/viewvc?rev=1183062&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3542 - Using failover: with static discovery in
a network connector to choose from a master/slave tuple leads to hangs and invalid states.
issue with demand forward bridge reacting to failover transport interupt/resume leading to
race conditions. race condition with tracking bridges and restarts. change default maxReconnectAttempts=0
to mean none, -1 for infinte. Default behavour is still infinite. To reference a master slave
pair, use: static:(failover:(a,b)?maxReconnectAttempts=0)?useExponentialBackOff=false. see
org.apache.activemq.network.FailoverStaticNetworkTest

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Oct 13 20:07:48 2011
@@ -116,7 +116,6 @@ public abstract class DemandForwardingBr
     protected CountDownLatch localStartedLatch = new CountDownLatch(1);
     protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
     protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
-    protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
     protected NetworkBridgeConfiguration configuration;
     protected NetworkBridgeFilterFactory filterFactory;
@@ -163,7 +162,7 @@ public abstract class DemandForwardingBr
                     serviceLocalException(error);
                 }
             });
-            remoteBroker.setTransportListener(new TransportListener() {
+            remoteBroker.setTransportListener(new DefaultTransportListener() {
 
                 public void onCommand(Object o) {
                     Command command = (Command) o;
@@ -174,55 +173,6 @@ public abstract class DemandForwardingBr
                     serviceRemoteException(error);
                 }
 
-                public void transportInterupted() {
-                    // clear any subscriptions - to try and prevent the bridge
-                    // from stalling the broker
-                    if (remoteInterupted.compareAndSet(false, true)) {
-                        LOG.info("Outbound transport to " + remoteBrokerName + " interrupted.");
-                        if (localBridgeStarted.get()) {
-                            clearDownSubscriptions();
-                            synchronized (DemandForwardingBridgeSupport.this) {
-                                try {
-                                    localBroker.oneway(localConnectionInfo.createRemoveCommand());
-                                } catch (TransportDisposedIOException td) {
-                                    LOG.debug("local broker is now disposed", td);
-                                } catch (IOException e) {
-                                    LOG.warn("Caught exception from local start", e);
-                                }
-                            }
-                        }
-                        localBridgeStarted.set(false);
-                        remoteBridgeStarted.set(false);
-                        startedLatch = new CountDownLatch(2);
-                        localStartedLatch = new CountDownLatch(1);
-                    }
-                }
-
-                public void transportResumed() {
-                    if (remoteInterupted.compareAndSet(true, false)) {
-                        // We want to slow down false connects so that we don't
-                        // get in a busy loop.
-                        // False connects can occurr if you using SSH tunnels.
-                        if (!lastConnectSucceeded.get()) {
-                            try {
-                                LOG.debug("Previous connection was never fully established.
Sleeping for second to avoid busy loop.");
-                                Thread.sleep(1000);
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                            }
-                        }
-                        lastConnectSucceeded.set(false);
-                        try {
-                            startLocalBridge();
-                            remoteBridgeStarted.set(true);
-                            startedLatch.countDown();
-                            LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
-                        } catch (Throwable e) {
-                            LOG.error("Caught exception  from local start in resume transport",
e);
-                            serviceLocalException(e);
-                        }
-                    }
-                }
             });
 
             localBroker.start();
@@ -260,7 +210,7 @@ public abstract class DemandForwardingBr
         asyncTaskRunner.execute(new Runnable() {
             public void run() {
                 final String originalName = Thread.currentThread().getName();
-                Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
+                Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
                 try {
                     startRemoteBridge();
                 } catch (Exception e) {
@@ -782,14 +732,7 @@ public abstract class DemandForwardingBr
                     serviceLocalBrokerInfo(command);
                 } else if (command.isShutdownInfo()) {
                     LOG.info(configuration.getBrokerName() + " Shutting down");
-                    // Don't shut down the whole connector if the remote side
-                    // was interrupted.
-                    // the local transport is just shutting down temporarily
-                    // until the remote side
-                    // is restored.
-                    if (!remoteInterupted.get()) {
-                        stop();
-                    }
+                    stop();
                 } else if (command.getClass() == ConnectionError.class) {
                     ConnectionError ce = (ConnectionError) command;
                     serviceLocalException(ce.getException());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Thu Oct 13 20:07:48 2011
@@ -90,9 +90,11 @@ public class DiscoveryNetworkConnector e
                 return;
             }
             // Should we try to connect to that URI?
-            if( bridges.containsKey(uri) ) {
-                LOG.debug("Discovery agent generated a duplicate onServiceAdd event for:
"+uri );
-                return;
+            synchronized (bridges) {
+                if( bridges.containsKey(uri) ) {
+                    LOG.debug("Discovery agent generated a duplicate onServiceAdd event for:
"+uri );
+                    return;
+                }
             }
             if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri)))
{
                 LOG.debug("not connecting loopback: " + uri);
@@ -132,7 +134,9 @@ public class DiscoveryNetworkConnector e
             NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
             try {
                 bridge.start();
-                bridges.put(uri, bridge);
+                synchronized (bridges) {
+                    bridges.put(uri, bridge);
+                }
             } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);
@@ -158,12 +162,13 @@ public class DiscoveryNetworkConnector e
                 return;
             }
 
-            NetworkBridge bridge = bridges.remove(uri);
-            if (bridge == null) {
-                return;
+            NetworkBridge bridge;
+            synchronized (bridges) {
+                bridge = bridges.remove(uri);
+            }
+            if (bridge != null) {
+                ServiceSupport.dispose(bridge);
             }
-
-            ServiceSupport.dispose(bridge);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Thu Oct 13 20:07:48 2011
@@ -58,6 +58,10 @@ public class SimpleDiscoveryAgent implem
             super(service);
         }
 
+        @Override
+        public String toString() {
+            return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures
+ "]";
+        }
     }
 
     public void setDiscoveryListener(DiscoveryListener listener) {
@@ -118,7 +122,7 @@ public class SimpleDiscoveryAgent implem
                         event.connectFailures++;
 
                         if (maxReconnectAttempts > 0 && event.connectFailures
>= maxReconnectAttempts) {
-                            LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+"
tries.  Reconnecting has been disabled.");
+                            LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+"
tries.  Reconnecting has been disabled for: " + event);
                             return;
                         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Thu Oct 13 20:07:48 2011
@@ -67,6 +67,7 @@ public class FailoverTransport implement
 
     private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
     private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
+    private static final int INFINITE = -1;
     private TransportListener transportListener;
     private boolean disposed;
     private boolean connected;
@@ -89,11 +90,11 @@ public class FailoverTransport implement
     private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
     private long maxReconnectDelay = 1000 * 30;
     private double backOffMultiplier = 2d;
-    private long timeout = -1;
+    private long timeout = INFINITE;
     private boolean useExponentialBackOff = true;
     private boolean randomize = true;
-    private int maxReconnectAttempts;
-    private int startupMaxReconnectAttempts;
+    private int maxReconnectAttempts = INFINITE;
+    private int startupMaxReconnectAttempts = INFINITE;
     private int connectFailures;
     private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
     private Exception connectionFailure;
@@ -107,8 +108,6 @@ public class FailoverTransport implement
     private int maxCacheSize = 128 * 1024;
     private final TransportListener disposedListener = new DefaultTransportListener() {
     };
-    //private boolean connectionInterruptProcessingComplete;
-
     private final TransportListener myTransportListener = createTransportListener();
     private boolean updateURIsSupported = true;
     private boolean reconnectSupported = true;
@@ -222,12 +221,12 @@ public class FailoverTransport implement
 
             boolean reconnectOk = false;
             synchronized (reconnectMutex) {
-                if (started) {
-                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to
" + connectedTransportURI
-                            + " , attempting to automatically reconnect due to: " + e);
-                    LOG.debug("Transport failed with the following exception:", e);
+                if (canReconnect()) {
                     reconnectOk = true;
                 }
+                LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason:
 " + e
+                        + (reconnectOk ? "," : ", not")  +" attempting to automatically reconnect");
+
                 initialized = false;
                 failedConnectTransportURI = connectedTransportURI;
                 connectedTransportURI = null;
@@ -240,11 +239,17 @@ public class FailoverTransport implement
 
                 if (reconnectOk) {
                     reconnectTask.wakeup();
+                } else {
+                    propagateFailureToExceptionListener(e);
                 }
             }
         }
     }
 
+    private boolean canReconnect() {
+        return started && 0 != calculateReconnectAttemptLimit();
+    }
+
     public final void handleConnectionControl(ConnectionControl control) {
         String reconnectStr = control.getReconnectTo();
         if (reconnectStr != null) {
@@ -292,7 +297,9 @@ public class FailoverTransport implement
 
     public void start() throws Exception {
         synchronized (reconnectMutex) {
-            LOG.debug("Started.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Started " + this);
+            }
             if (started) {
                 return;
             }
@@ -311,7 +318,9 @@ public class FailoverTransport implement
     public void stop() throws Exception {
         Transport transportToStop = null;
         synchronized (reconnectMutex) {
-            LOG.debug("Stopped.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopped " + this);
+            }
             if (!started) {
                 return;
             }
@@ -825,9 +834,7 @@ public class FailoverTransport implement
                         doRebalance = false;
                     }
 
-                    if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY)
{
-                        reconnectDelay = initialReconnectDelay;
-                    }
+                    resetReconnectDelay();
 
                     Transport transport = null;
                     URI uri = null;
@@ -845,7 +852,9 @@ public class FailoverTransport implement
                     // for the first time, or we were disposed for some reason.
                     if (transport == null && !firstConnection && (reconnectDelay
> 0) && !disposed) {
                         synchronized (sleepMutex) {
-                            LOG.debug("Waiting " + reconnectDelay + " ms before attempting
connection. ");
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Waiting " + reconnectDelay + " ms before attempting
connection. ");
+                            }
                             try {
                                 sleepMutex.wait(reconnectDelay);
                             } catch (InterruptedException e) {
@@ -868,16 +877,18 @@ public class FailoverTransport implement
                             }
 
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug("Attempting connect to: " + uri);
+                                LOG.debug("Attempting  " + connectFailures + "th  connect
to: " + uri);
                             }
                             transport.setTransportListener(myTransportListener);
                             transport.start();
 
-                            if (started) {
+                            if (started &&  !firstConnection) {
                                 restoreTransport(transport);
                             }
 
-                            LOG.debug("Connection established");
+                             if (LOG.isDebugEnabled()) {
+                                LOG.debug("Connection established");
+                             }
                             reconnectDelay = initialReconnectDelay;
                             connectedTransportURI = uri;
                             connectedTransport.set(transport);
@@ -899,7 +910,9 @@ public class FailoverTransport implement
                             if (transportListener != null) {
                                 transportListener.transportResumed();
                             } else {
-                                LOG.debug("transport resumed by transport listener not set");
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("transport resumed by transport listener not
set");
+                                }
                             }
 
                             if (firstConnection) {
@@ -934,19 +947,10 @@ public class FailoverTransport implement
                 }
             }
 
-            int reconnectAttempts = 0;
-            if (firstConnection) {
-                if (this.startupMaxReconnectAttempts != 0) {
-                    reconnectAttempts = this.startupMaxReconnectAttempts;
-                }
-            }
-
-            if (reconnectAttempts == 0) {
-                reconnectAttempts = this.maxReconnectAttempts;
-            }
+            int reconnectLimit = calculateReconnectAttemptLimit();
 
-            if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts)
{
-                LOG.error("Failed to connect to transport after: " + connectFailures + "
attempt(s)");
+            if (reconnectLimit != INFINITE && ++connectFailures >= reconnectLimit)
{
+                LOG.error("Failed to connect to " + uris + " after: " + connectFailures +
" attempt(s)");
                 connectionFailure = failure;
 
                 // Make sure on initial startup, that the transportListener has been
@@ -960,14 +964,7 @@ public class FailoverTransport implement
                     }
                 }
 
-                if (transportListener != null) {
-                    if (connectionFailure instanceof IOException) {
-                        transportListener.onException((IOException) connectionFailure);
-                    } else {
-                        transportListener.onException(IOExceptionSupport.create(connectionFailure));
-                    }
-                }
-                reconnectMutex.notifyAll();
+                propagateFailureToExceptionListener(connectionFailure);
                 return false;
             }
         }
@@ -976,7 +973,9 @@ public class FailoverTransport implement
 
             if (reconnectDelay > 0) {
                 synchronized (sleepMutex) {
-                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection.
");
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
+                    }
                     try {
                         sleepMutex.wait(reconnectDelay);
                     } catch (InterruptedException e) {
@@ -997,6 +996,34 @@ public class FailoverTransport implement
         return !disposed;
     }
 
+    private void resetReconnectDelay() {
+        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY)
{
+            reconnectDelay = initialReconnectDelay;
+        }
+    }
+
+    /*
+      * called with reconnectMutex held
+     */
+    private void propagateFailureToExceptionListener(Exception exception) {
+        if (transportListener != null) {
+            if (exception instanceof IOException) {
+                transportListener.onException((IOException)exception);
+            } else {
+                transportListener.onException(IOExceptionSupport.create(exception));
+            }
+        }
+        reconnectMutex.notifyAll();
+    }
+
+    private int calculateReconnectAttemptLimit() {
+        int maxReconnectValue = this.maxReconnectAttempts;
+        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
+            maxReconnectValue = this.startupMaxReconnectAttempts;
+        }
+        return maxReconnectValue;
+    }
+
     final boolean buildBackups() {
         synchronized (backupMutex) {
             if (!disposed && backup && backups.size() < backupPoolSize)
{

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
Thu Oct 13 20:07:48 2011
@@ -17,12 +17,15 @@
 package org.apache.activemq.network;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import static org.junit.Assert.assertEquals;
@@ -31,6 +34,7 @@ import static org.junit.Assert.assertTru
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.management.ObjectName;
@@ -42,8 +46,10 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
 import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,12 +76,16 @@ public class FailoverStaticNetworkTest {
     protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts,
                                          HashMap<String, String> networkProps) throws
Exception {
         BrokerService broker = new BrokerService();
-        //broker.setUseJmx(false);
         broker.getManagementContext().setCreateConnector(false);
         broker.setSslContext(sslContext);
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setBrokerName("Broker_" + listenPort);
-        broker.addConnector(scheme + "://localhost:" + listenPort);
+        // lazy init listener on broker start
+        TransportConnector transportConnector = new TransportConnector();
+        transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        transportConnectors.add(transportConnector);
+        broker.setTransportConnectors(transportConnectors);
         if (networkToPorts != null && networkToPorts.length > 0) {
             StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
             builder.append(networkToPorts[0]);
@@ -84,7 +94,7 @@ public class FailoverStaticNetworkTest {
             }
             // limit the reconnects in case of initial random connection to slave
             // leaving randomize on verifies that this config is picked up
-            builder.append(")?maxReconnectAttempts=1)");
+            builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false");
             NetworkConnector nc = broker.addNetworkConnector(builder.toString());
             if (networkProps != null) {
                 IntrospectionSupport.setProperties(nc, networkProps);
@@ -309,11 +319,89 @@ public class FailoverStaticNetworkTest {
         doTestNetworkSendReceive();
     }
 
+    @Test
+    public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception {
+
+        brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"});
+        brokerB.start();
+
+        final AtomicBoolean done = new AtomicBoolean(false);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!done.get()) {
+                        brokerA = createBroker("tcp", "61610", null);
+                        brokerA.setBrokerName("Pair");
+                        brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName()
+ ":" + "BrokerName="
+                                + JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker"));
+                        ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+                        brokerA.start();
+                        brokerA.waitUntilStopped();
+
+                        // restart after peer taken over
+                        brokerA1.waitUntilStarted();
+                    }
+                } catch (Exception ignored) {
+                    LOG.info("A create/start, unexpected: " + ignored, ignored);
+                }
+            }
+        });
+
+        // start with brokerA as master
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerA != null && brokerA.waitUntilStarted();
+            }
+        });
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!done.get()) {
+                        brokerA1 = createBroker("tcp", "61611", null);
+                        brokerA1.setBrokerName("Pair");
+                        // so they can coexist in local jmx we set the object name b/c the
brokername identifies the shared store
+                        brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName()
+ ":" + "BrokerName="
+                            + JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker"));
+                        ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+                        brokerA1.start();
+                        brokerA1.waitUntilStopped();
+
+                        // restart after peer taken over
+                        brokerA.waitUntilStarted();
+                    }
+                } catch (Exception ignored) {
+                    LOG.info("A1 create/start, unexpected: " + ignored, ignored);
+                }
+            }
+        });
+
+        for (int i=0; i<10; i++) {
+            BrokerService currentMaster =  (i%2 == 0 ? brokerA : brokerA1);
+            LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
+            currentMaster.waitUntilStarted();
+
+            doTestNetworkSendReceive(brokerB, currentMaster);
+
+            LOG.info("Stopping " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
+            currentMaster.stop();
+            currentMaster.waitUntilStopped();
+        }
+
+        done.set(false);
+        LOG.info("all done");
+        executorService.shutdownNow();
+    }
+
     private void doTestNetworkSendReceive() throws Exception, JMSException {
         doTestNetworkSendReceive(brokerB, brokerA);
     }
 
-    private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception,
JMSException {
+    private void doTestNetworkSendReceive(final BrokerService to, final BrokerService from)
throws Exception, JMSException {
 
         LOG.info("Creating Consumer on the networked broker ..." + from);
         
@@ -332,7 +420,9 @@ public class FailoverStaticNetworkTest {
         
         boolean gotMessage = Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
-                return consumer.receive(1000) != null;
+                Message message = consumer.receive(5000);
+                LOG.info("from:  " + from.getBrokerObjectName().getKeyProperty("BrokerName")
+  ", received: " + message);
+                return message != null;
             }      
         });
         try {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryUriTest.java
Thu Oct 13 20:07:48 2011
@@ -54,7 +54,7 @@ public class DiscoveryUriTest extends Em
 
     public void testFailedConnect() throws Exception {
         try {
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&maxReconnectAttempts=3&useExponentialBackOff=false");
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&startupMaxReconnectAttempts=3&useExponentialBackOff=false");
             Connection conn = factory.createConnection();
             conn.start();
         } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
Thu Oct 13 20:07:48 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.fa
 import java.io.IOException;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
 import javax.net.ServerSocketFactory;
@@ -30,17 +31,20 @@ import org.apache.activemq.util.Wait;
 
 public class SlowConnectionTest extends TestCase {
 
+    private CountDownLatch socketReadyLatch = new CountDownLatch(1);
+
     public void testSlowConnection() throws Exception {
 
+        MockBroker broker = new MockBroker();
+        broker.start();
+
+        socketReadyLatch.await();
         int timeout = 1000;
-        URI tcpUri = new URI("tcp://localhost:61616?soTimeout=" + timeout + "&trace=true&connectionTimeout="
+ timeout + "&wireFormat.maxInactivityDurationInitalDelay=" + timeout);
+        URI tcpUri = new URI("tcp://localhost:" + broker.ss.getLocalPort() + "?soTimeout="
+ timeout + "&trace=true&connectionTimeout=" + timeout + "&wireFormat.maxInactivityDurationInitalDelay="
+ timeout);
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri
+ ")");
         final Connection connection = cf.createConnection();
 
-        MockBroker broker = new MockBroker();
-        broker.start();
-
         new Thread(new Runnable() {
             public void run() {
                 try { connection.start(); } catch (Throwable ignored) {}
@@ -62,19 +66,25 @@ public class SlowConnectionTest extends 
     }
 
     class MockBroker extends Thread {
+        ServerSocket ss = null;
+        public MockBroker() {
+            super("MockBroker");
+        }
 
         public void run() {
 
             List<Socket> inProgress = new ArrayList<Socket>();
             ServerSocketFactory factory = ServerSocketFactory.getDefault();
-            ServerSocket ss = null;
 
             try {
-                ss = factory.createServerSocket(61616);
+                ss = factory.createServerSocket(0);
+                ss.setSoTimeout(5000);
 
+                socketReadyLatch.countDown();
                 while (!interrupted()) {
                     inProgress.add(ss.accept());    // eat socket
                 }
+            } catch (java.net.SocketTimeoutException expected) {
             } catch (Exception e) {
                 e.printStackTrace();
             } finally {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
Thu Oct 13 20:07:48 2011
@@ -320,7 +320,7 @@ public class DurableSubProcessTest exten
                 "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" +
                 "jms.producerWindowSize=20971520&" +
                 "jms.copyMessageOnSend=false&" +
-                "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&"
+
+                "initialReconnectDelay=100&maxReconnectDelay=30000&" +
                 "useExponentialBackOff=true";
         final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java?rev=1183062&r1=1183061&r2=1183062&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
Thu Oct 13 20:07:48 2011
@@ -398,7 +398,7 @@ public class DurableSubProcessWithRestar
                 + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
                 + "jms.producerWindowSize=20971520&"
                 + "jms.copyMessageOnSend=false&"
-                + "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&"
+                + "initialReconnectDelay=100&maxReconnectDelay=30000&"
                 + "useExponentialBackOff=true";
         final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
 



Mime
View raw message