activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1069339 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/vm/ test/java/org/apache/activemq...
Date Thu, 10 Feb 2011 11:02:49 GMT
Author: gtully
Date: Thu Feb 10 11:02:48 2011
New Revision: 1069339

URL: http://svn.apache.org/viewvc?rev=1069339&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3176 - Potential deadlock in duplex network connector
recreation, resulting in dangling connections. https://issues.apache.org/jira/browse/AMQ-3129
- Can only have one duplex networkConnection per transportConnection. Rework of https://issues.apache.org/jira/browse/AMQ-2774.
NetworkConnector.name attribute (default NC) is used to differenciate duplex network connectors
so it needs to be unique to allow multiple to be configured. Removing existing matching network
connectors on a reconnect is now thread safe

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Feb 10 11:02:48 2011
@@ -150,7 +150,7 @@ public class TransportConnection impleme
     private final TaskRunnerFactory taskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
     private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
-    private BrokerId	duplexRemoteBrokerId;
+    private String duplexNetworkConnectorId;
 
     /**
      * @param connector
@@ -946,7 +946,7 @@ public class TransportConnection impleme
                             serviceLock.writeLock().unlock();
                         }
                     }
-                });
+                }, "StopAsync:" + transport.getRemoteAddress());
             } catch (Throwable t) {
                 LOG.warn("cannot create async transport stopper thread.. not waiting for
stop to complete, reason:", t);
                 stopped.countDown();
@@ -1179,25 +1179,32 @@ public class TransportConnection impleme
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
             try {
+                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
+                Map<String, String> props = createMap(properties);
+                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+                IntrospectionSupport.setProperties(config, props, "");
+                config.setBrokerName(broker.getBrokerName());
+
+                // check for existing duplex connection hanging about
+
                 // We first look if existing network connection already exists for the same
broker Id and network connector name
                 // It's possible in case of brief network fault to have this transport connector
side of the connection always active
                 // and the duplex network connector side wanting to open a new one
                 // In this case, the old connection must be broken
-                BrokerId	remoteBrokerId = info.getBrokerId();
-                setDuplexRemoteBrokerId(remoteBrokerId);
+                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();

                 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
-                for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();)
{
-            		TransportConnection c = iter.next();
-                    if ((c != this) && (remoteBrokerId.equals(c.getDuplexRemoteBrokerId())))
{
-                        LOG.warn("An existing duplex active connection already exists for
this broker (" + remoteBrokerId + "). Stopping it.");
-                        c.stop();
+                synchronized (connections) {
+                    for (Iterator<TransportConnection> iter = connections.iterator();
iter.hasNext();) {
+                        TransportConnection c = iter.next();
+                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId())))
{
+                            LOG.warn("Stopping an existing active duplex connection [" +
c + "] for network connector (" + duplexNetworkConnectorId + ").");
+                            c.stopAsync();
+                            // better to wait for a bit rather than get connection id already
in use and failure to start new bridge
+                            c.getStopped().await(1, TimeUnit.SECONDS);
+                        }
                     }
+                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
                 }
-                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
-                Map<String, String> props = createMap(properties);
-                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
-                IntrospectionSupport.setProperties(config, props, "");
-                config.setBrokerName(broker.getBrokerName());
                 URI uri = broker.getVmConnectorURI();
                 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
                 map.put("network", "true");
@@ -1217,13 +1224,14 @@ public class TransportConnection impleme
                 info.setDuplexConnection(false);
                 duplexBridge.setCreatedByDuplex(true);
                 duplexBridge.duplexStart(this, brokerInfo, info);
-                LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
+                LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
                 return null;
             } catch (TransportDisposedIOException e) {
-                LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " was correctly
stopped before it was correctly started.");
+                LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before
it was correctly started.");
                 return null;
             } catch (Exception e) {
-                LOG.error("Creating duplex network bridge", e);
+                LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId
, e);
+                return null;
             }
         }
         // We only expect to get one broker info command per connection
@@ -1415,11 +1423,15 @@ public class TransportConnection impleme
         return connectionStateRegister.lookupConnectionState(connectionId);
     }
 
-    protected synchronized void setDuplexRemoteBrokerId(BrokerId remoteBrokerId) {
-        this.duplexRemoteBrokerId = remoteBrokerId;
+    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId)
{
+        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
     }
 
-    protected synchronized BrokerId getDuplexRemoteBrokerId() {
-        return this.duplexRemoteBrokerId;
+    protected synchronized String getDuplexNetworkConnectorId() {
+        return this.duplexNetworkConnectorId;
+    }
+    
+    protected CountDownLatch getStopped() {
+        return stopped;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Feb 10 11:02:48 2011
@@ -219,8 +219,9 @@ public abstract class DemandForwardingBr
                             remoteBridgeStarted.set(true);
                             startedLatch.countDown();
                             LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
-                        } catch (Exception e) {
+                        } catch (Throwable e) {
                             LOG.error("Caught exception  from local start in resume transport",
e);
+                            serviceLocalException(e);
                         }
                     }
                 }
@@ -248,7 +249,7 @@ public abstract class DemandForwardingBr
                 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
                 try {
                     startLocalBridge();
-                } catch (Exception e) {
+                } catch (Throwable e) {
                     serviceLocalException(e);
                 } finally {
                     Thread.currentThread().setName(originalName);
@@ -273,7 +274,7 @@ public abstract class DemandForwardingBr
         });
     }
 
-    protected void startLocalBridge() throws Exception {
+    protected void startLocalBridge() throws Throwable {
         if (localBridgeStarted.compareAndSet(false, true)) {
             synchronized (this) {
                 if (LOG.isTraceEnabled()) {
@@ -284,7 +285,7 @@ public abstract class DemandForwardingBr
                 if (!disposed.get()) {
                     localConnectionInfo = new ConnectionInfo();
                     localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                    localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
+                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_"
+ configuration.getBrokerName();
                     localConnectionInfo.setClientId(localClientId);
                     localConnectionInfo.setUserName(configuration.getUserName());
                     localConnectionInfo.setPassword(configuration.getPassword());
@@ -296,10 +297,14 @@ public abstract class DemandForwardingBr
                         X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
                         localConnectionInfo.setTransportContext(peerCerts);
                     }
-                    localBroker.oneway(localConnectionInfo);
-
+                    // sync requests that may fail
+                    Object resp = localBroker.request(localConnectionInfo);
+                    if (resp instanceof ExceptionResponse) {
+                        throw ((ExceptionResponse)resp).getException();
+                    }
                     localSessionInfo = new SessionInfo(localConnectionInfo, 1);
                     localBroker.oneway(localSessionInfo);
+
                     brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
                     NetworkBridgeListener l = this.networkBridgeListener;
                     if (l != null) {
@@ -346,7 +351,7 @@ public abstract class DemandForwardingBr
                 }
                 remoteConnectionInfo = new ConnectionInfo();
                 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                remoteConnectionInfo.setClientId("NC_" + configuration.getBrokerName() +
"_outbound");
+                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName()
+ "_outbound");
                 remoteConnectionInfo.setUserName(configuration.getUserName());
                 remoteConnectionInfo.setPassword(configuration.getPassword());
                 remoteBroker.oneway(remoteConnectionInfo);
@@ -857,7 +862,7 @@ public abstract class DemandForwardingBr
     /**
      * @return Returns the staticallyIncludedDestinations.
      */
-    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
+    public ActiveMQDestination[] getStaticallyIncludedestinations() {
         return staticallyIncludedDestinations;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Thu Feb 10 11:02:48 2011
@@ -232,15 +232,6 @@ public class DiscoveryNetworkConnector e
         return configureBridge(result);
     }
 
-    public String getName() {
-        String name = super.getName();
-        if (name == null) {
-            name = discoveryAgent.toString();
-            super.setName(name);
-        }
-        return name;
-    }
-
     @Override
     public String toString() {
         return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
Thu Feb 10 11:02:48 2011
@@ -269,20 +269,9 @@ public class      LdapNetworkConnector
       context.close();
    }
 
-   /**
-    * returns the name of the connector
-    *
-    * @return connector name
-    */
-   public String getName() {
-
-        String name = super.getName();
-        if (name == null) {
-            name = this.getClass().getName() + " [" + ldapURI.toString() + "]";
-            super.setName(name);
-        }
-        return name;
-    }
+   public String toString() {
+       return this.getClass().getName() + getName()  + "[" + ldapURI.toString() + "]";
+   }
 
    /**
      * add connector of the given URI

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java
Thu Feb 10 11:02:48 2011
@@ -141,13 +141,9 @@ public class MulticastNetworkConnector e
         }
     }
 
-    public String getName() {
-        String name = super.getName();
-        if(name == null) {
-            name = remoteTransport.toString();
-            super.setName(name);
-        }
-        return name;
+    @Override
+    public String toString() {
+        return getClass().getName() + ":" + getName() + "["  + remoteTransport.toString()
+ "]";
     }
 
     protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote)
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
Thu Feb 10 11:02:48 2011
@@ -39,7 +39,7 @@ public class NetworkBridgeConfiguration 
     private String userName;
     private String password;
     private String destinationFilter = ">";
-    private String name = null;
+    private String name = "NC";
     
     private List<ActiveMQDestination> excludedDestinations;
     private List<ActiveMQDestination> dynamicallyIncludedDestinations;
@@ -223,9 +223,6 @@ public class NetworkBridgeConfiguration 
      * @return the name
      */
     public String getName() {
-        if(this.name == null) {
-            this.name = "localhost";
-        }
         return this.name;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Thu Feb 10 11:02:48 2011
@@ -210,11 +210,11 @@ public abstract class NetworkConnector e
         if (localURI == null) {
             throw new IllegalStateException("You must configure the 'localURI' property");
         }
-        LOG.info("Network Connector " + getName() + " Started");
+        LOG.info("Network Connector " + this + " Started");
     }
 
     protected void handleStop(ServiceStopper stopper) throws Exception {
-        LOG.info("Network Connector " + getName() + " Stopped");
+        LOG.info("Network Connector " + this + " Stopped");
     }
 
     public ObjectName getObjectName() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Thu Feb 10 11:02:48 2011
@@ -243,7 +243,7 @@ public class InactivityMonitor extends T
             try {
 
                 if( failed.get() ) {
-                    throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
+                    throw new InactivityIOException("Cannot send, channel has already failed:
"+next.getRemoteAddress());
                 }
                 if (o.getClass() == WireFormatInfo.class) {
                     synchronized (this) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
Thu Feb 10 11:02:48 2011
@@ -134,4 +134,8 @@ public class VMTransportServer implement
     public InetSocketAddress getSocketAddress() {
         return null;
     }
+    
+    public int getConnectionCount() {
+        return connectionCount.intValue();
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
Thu Feb 10 11:02:48 2011
@@ -107,7 +107,7 @@ public class DiscoveryNetworkReconnectTe
             allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
                     new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
             allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
-                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
+                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=NC"))));
             allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
           
                     new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
             allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
@@ -116,7 +116,7 @@ public class DiscoveryNetworkReconnectTe
                     new ObjectName("Test:BrokerName=BrokerNC,Type=jobScheduler,jobSchedulerName=JMS"))));
             
             atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)),
with(new NetworkBridgeObjectNameMatcher<ObjectName>(
-                        new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"

+                        new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=NC,Name=localhost/127.0.0.1_"

                             + proxy.getUrl().getPort())))); will(new CustomAction("signal
register network mbean") {
                                 public Object invoke(Invocation invocation) throws Throwable
{
                                     LOG.info("Mbean Registered: " + invocation.getParameter(0));
@@ -125,7 +125,7 @@ public class DiscoveryNetworkReconnectTe
                                 }
                             });
             atLeast(maxReconnects - 1).of (managementContext).unregisterMBean(with(new NetworkBridgeObjectNameMatcher<ObjectName>(
-                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"

+                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=NC,Name=localhost/127.0.0.1_"

                             + proxy.getUrl().getPort())))); will(new CustomAction("signal
unregister network mbean") {
                                 public Object invoke(Invocation invocation) throws Throwable
{
                                     LOG.info("Mbean Unregistered: " + invocation.getParameter(0));
@@ -137,7 +137,7 @@ public class DiscoveryNetworkReconnectTe
             allowing (managementContext).unregisterMBean(with(equal(
                     new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
             allowing (managementContext).unregisterMBean(with(equal(
-                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
+                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=NC"))));
             allowing (managementContext).unregisterMBean(with(equal(            
                     new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
             allowing (managementContext).unregisterMBean(with(equal(

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
Thu Feb 10 11:02:48 2011
@@ -18,7 +18,7 @@ package org.apache.activemq.usecases;
 
 import java.net.URI;
 import java.util.List;
-
+import java.util.concurrent.TimeUnit;
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
@@ -26,23 +26,34 @@ import javax.jms.TextMessage;
 import junit.framework.Test;
 
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(BrokerQueueNetworkWithDisconnectTest.class);
     private static final int NETWORK_DOWN_TIME = 5000;
     protected static final int MESSAGE_COUNT = 200;
     private static final String HUB = "HubBroker";
     private static final String SPOKE = "SpokeBroker";
     private SocketProxy socketProxy;
     private long networkDownTimeStart;
-    public boolean useDuplexNetworkBridge;
+    public boolean useDuplexNetworkBridge = true;
     public boolean sumulateStalledNetwork;
+    private long inactiveDuration = 1000;
+    private boolean useSocketProxy = true;
 
    
     public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
@@ -75,6 +86,80 @@ public class BrokerQueueNetworkWithDisco
                 MESSAGE_COUNT <= msgs.getMessageCount());
     }
 
+    public void testNoStuckConnectionsWithTransportDisconnect() throws Exception {
+        inactiveDuration=60000l;
+        useDuplexNetworkBridge = true;
+
+        bridgeBrokers(SPOKE, HUB);
+
+        final BrokerItem hub = brokers.get(HUB);
+        hub.broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    int sleepCount = 2;
+                    @Override
+                    public void removeConnection(ConnectionContext context,
+                            ConnectionInfo info, Throwable error)
+                            throws Exception {
+                        try {
+                            while(--sleepCount >= 0) {
+                                LOG.info("sleeping for a bit in close impl to simulate load
where reconnect fails due to a pending close");
+                                TimeUnit.SECONDS.sleep(2);
+                            }
+                        } catch (Exception ignored) {}
+                        super.removeConnection(context, info, error);
+                    }
+                }
+        });
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        // kill the initiator side, leaving remote end intact
+        // simulate async network breakage
+        // remote side will need to spot duplicate network and stop/kill the original
+        for (int i=0; i< 3;  i++) {
+            socketProxy.halfClose();
+            sleep(10000);
+        }
+        // wait for full reformation of bridge       
+        // verify no extra connections
+        boolean allGood = Wait.waitFor(new Wait.Condition(){ 
+                    public boolean isSatisified() throws Exception {
+                        long numConnections = hub.broker.getTransportConnectors().get(0).getConnections().size();
+                        LOG.info("Num connetions:" + numConnections);
+                        return numConnections == 1;
+                    }});
+        if (!allGood) {
+            dumpAllThreads("ExtraHubConnection");
+        }
+        assertTrue("should be only one transport connection for the single duplex network
connector", allGood);
+
+        allGood = Wait.waitFor(new Wait.Condition(){
+                    public boolean isSatisified() throws Exception {
+                        long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount();
+                        LOG.info("Num VM connetions:" + numVmConnections);
+                        return numVmConnections == 1;
+                    }});
+        if (!allGood) {
+            dumpAllThreads("ExtraHubVMConnection");
+        }
+        assertTrue("should be only one vm connection for the single network duplex network
connector", allGood);
+    }
+    
+    public void testTwoDuplexNCsAreAllowed() throws Exception {
+        useDuplexNetworkBridge = true;
+        useSocketProxy = false;
+
+        NetworkConnector connector = bridgeBrokers(SPOKE, HUB);
+        connector.setName("FirstDuplex");
+        connector = bridgeBrokers(SPOKE, HUB);
+        connector.setName("SecondDuplex");
+
+        startAllBrokers(); 
+        waitForBridgeFormation();
+
+        BrokerItem hub = brokers.get(HUB);
+        assertEquals("Has two transport Connectors", 2, hub.broker.getTransportConnectors().get(0).getConnections().size());
+    }
     
     @Override
     protected void startAllBrokers() throws Exception {
@@ -88,6 +173,8 @@ public class BrokerQueueNetworkWithDisco
 
     public void setUp() throws Exception {
         networkDownTimeStart = 0;
+        inactiveDuration = 1000;
+        useSocketProxy = true;
         super.setAutoFail(true);
         super.setUp();
         final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
@@ -95,6 +182,13 @@ public class BrokerQueueNetworkWithDisco
         createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
     }
     
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (socketProxy != null) {
+            socketProxy.close();
+        }
+    }
+    
     public static Test suite() {
         return suite(BrokerQueueNetworkWithDisconnectTest.class);
     }
@@ -133,16 +227,18 @@ public class BrokerQueueNetworkWithDisco
         }    
     }
 
-
     @Override
     protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
         List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
         URI remoteURI;
         if (!transportConnectors.isEmpty()) {
             remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
-            socketProxy = new SocketProxy(remoteURI);
-            DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:("
+ socketProxy.getUrl() 
-                    + "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false"));
+            if (useSocketProxy) {
+                socketProxy = new SocketProxy(remoteURI);
+                remoteURI = socketProxy.getUrl();
+            }
+            DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:("
+ remoteURI 
+                    + "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay="
+ inactiveDuration + ")?useExponentialBackOff=false"));
             connector.setDynamicOnly(dynamicOnly);
             connector.setNetworkTTL(networkTTL);
             localBroker.addNetworkConnector(connector);
@@ -154,7 +250,5 @@ public class BrokerQueueNetworkWithDisco
         } else {
             throw new Exception("Remote broker has no registered connectors.");
         }
-
     }
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=1069339&r1=1069338&r2=1069339&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Thu
Feb 10 11:02:48 2011
@@ -50,7 +50,7 @@ public class SocketProxy {
     
     private CountDownLatch closed = new CountDownLatch(1);
 
-    public List<Connection> connections = new LinkedList<Connection>();
+    public List<Bridge> connections = new LinkedList<Bridge>();
 
     private int listenPort = 0;
 
@@ -102,18 +102,33 @@ public class SocketProxy {
      * close all proxy connections and acceptor
      */
     public void close() {
-        List<Connection> connections;
+        List<Bridge> connections;
         synchronized(this.connections) {
-            connections = new ArrayList<Connection>(this.connections);
+            connections = new ArrayList<Bridge>(this.connections);
         }            
         LOG.info("close, numConnectons=" + connections.size());
-        for (Connection con : connections) {
+        for (Bridge con : connections) {
             closeConnection(con);
         }
         acceptor.close();
         closed.countDown();
     }
 
+    /*
+     * close all proxy receive connections, leaving acceptor
+     * open
+     */
+    public void halfClose() {
+        List<Bridge> connections;
+        synchronized(this.connections) {
+            connections = new ArrayList<Bridge>(this.connections);
+        }            
+        LOG.info("halfClose, numConnectons=" + connections.size());
+        for (Bridge con : connections) {
+            halfCloseConnection(con);
+        }
+    }
+
     public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
         return closed.await(timeoutSeconds, TimeUnit.SECONDS);
     }
@@ -138,7 +153,7 @@ public class SocketProxy {
         synchronized(connections) {
             LOG.info("pause, numConnectons=" + connections.size());
             acceptor.pause();
-            for (Connection con : connections) {
+            for (Bridge con : connections) {
                 con.pause();
             }
         }
@@ -150,14 +165,14 @@ public class SocketProxy {
     public void goOn() {
         synchronized(connections) {
             LOG.info("goOn, numConnectons=" + connections.size());
-            for (Connection con : connections) {
+            for (Bridge con : connections) {
                 con.goOn();
             }
         }
         acceptor.goOn();
     }
 
-    private void closeConnection(Connection c) {
+    private void closeConnection(Bridge c) {
         try {
             c.close();
         } catch (Exception e) {
@@ -165,20 +180,28 @@ public class SocketProxy {
         }
     }
 
+    private void halfCloseConnection(Bridge c) {
+        try {
+            c.halfClose();
+        } catch (Exception e) {
+            LOG.debug("exception on half close of: " + c, e);
+        }
+    }
+
     private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
         int listenPort = serverSocket.getLocalPort();
 
         return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(),
uri.getQuery(), uri.getFragment());
     }
 
-    public class Connection {
+    public class Bridge {
 
         private Socket receiveSocket;
         private Socket sendSocket;
         private Pump requestThread;
         private Pump responseThread;
 
-        public Connection(Socket socket, URI target) throws Exception {
+        public Bridge(Socket socket, URI target) throws Exception {
             receiveSocket = socket;
             sendSocket = new Socket();
             if (receiveBufferSize > 0) {
@@ -207,10 +230,14 @@ public class SocketProxy {
             sendSocket.close();
         }
 
+        public void halfClose() throws Exception {
+            receiveSocket.close();
+        }
+
         private void linkWithThreads(Socket source, Socket dest) {
             requestThread = new Pump(source, dest);
-            responseThread = new Pump(dest, source);
             requestThread.start();
+            responseThread = new Pump(dest, source);
             responseThread.start();
         }
 
@@ -252,12 +279,15 @@ public class SocketProxy {
                 } catch (Exception e) {
                     LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
                     try {
-                        close();
+                        if (!receiveSocket.isClosed()) {
+                            // for halfClose, on read/write failure if we close the
+                            // remote end will see a close at the same time.
+                            close();
+                        }
                     } catch (Exception ignore) {
                     }
                 }
             }
-
         }
     }
 
@@ -293,14 +323,13 @@ public class SocketProxy {
                     pause.get().await();
                     try {
                         Socket source = socket.accept();
-                        LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
                         pause.get().await();
                         if (receiveBufferSize > 0) {
                             source.setReceiveBufferSize(receiveBufferSize);
                         }
                         LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
                         synchronized(connections) {
-                            connections.add(new Connection(source, target));
+                            connections.add(new Bridge(source, target));
                         }
                     } catch (SocketTimeoutException expected) {
                     }



Mime
View raw message