activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r808890 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/discovery/simple/ test/java/org/apache/activemq/transport/discovery/ test/java/org/apache/activemq/util/
Date Fri, 28 Aug 2009 14:16:30 GMT
Author: gtully
Date: Fri Aug 28 14:16:29 2009
New Revision: 808890

URL: http://svn.apache.org/viewvc?rev=808890&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-1855 - effects multicast discovery also;
disposed is now atomic and the bridge.stop() waits for shutdown info to be sent before closing
the transport, this will alleviate the InvalidClientIDException. bridges are now only remembered
after they start which alleviates the retry problem with a failed bridge remaining, added
test that exercises the code but which does not demonstrate the behavour on a dual core

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=808890&r1=808889&r2=808890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
Fri Aug 28 14:16:29 2009
@@ -70,7 +70,7 @@
                     ServiceSupport.dispose(this);
                 }
             }
-            if (!disposed) {
+            if (!disposed.get()) {
                 triggerLocalStartBridge();
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=808890&r1=808889&r2=808890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Aug 28 14:16:29 2009
@@ -103,7 +103,7 @@
     protected int demandConsumerDispatched;
     protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
     protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
-    protected boolean disposed;
+    protected AtomicBoolean disposed = new AtomicBoolean();
     protected BrokerId localBrokerId;
     protected ActiveMQDestination[] excludedDestinations;
     protected ActiveMQDestination[] dynamicallyIncludedDestinations;
@@ -281,7 +281,7 @@
 
                 localConnectionInfo = new ConnectionInfo();
                 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                localClientId = "NC_" + remoteBrokerName + "_inbound" + configuration.getBrokerName();
+                localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
                 localConnectionInfo.setClientId(localClientId);
                 localConnectionInfo.setUserName(configuration.getUserName());
                 localConnectionInfo.setPassword(configuration.getPassword());
@@ -345,7 +345,7 @@
                 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
                 remoteBroker.oneway(demandConsumerInfo);
                 startedLatch.countDown();
-                if (!disposed) {
+                if (!disposed.get()) {
                     triggerLocalStartBridge();
                 }
             }
@@ -354,37 +354,36 @@
 
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
-            LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " is disposed already ? " + disposed);
-            boolean wasDisposedAlready = disposed;
-            if (!disposed) {
+            if (disposed.compareAndSet(false, true)) {
+                LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " +
remoteBrokerName);
                 NetworkBridgeListener l = this.networkBridgeListener;
                 if (l != null) {
                     l.onStop(this);
                 }
                 try {
-                    disposed = true;
                     remoteBridgeStarted.set(false);
                     final CountDownLatch sendShutdown = new CountDownLatch(1);
                     ASYNC_TASKS.execute(new Runnable() {
                         public void run() {
                             try {
                                 localBroker.oneway(new ShutdownInfo());
+                                sendShutdown.countDown();
                                 remoteBroker.oneway(new ShutdownInfo());
                             } catch (Throwable e) {
                                 LOG.debug("Caught exception sending shutdown", e);
-                            }finally {
+                            } finally {
                                 sendShutdown.countDown();
                             }
                             
                         }
                     });
-                    if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) {
+                    if( !sendShutdown.await(5, TimeUnit.SECONDS) ) {
                         LOG.debug("Network Could not shutdown in a timely manner");
                     }
                 } finally {
                     ServiceStopper ss = new ServiceStopper();
-                    ss.stop(localBroker);
                     ss.stop(remoteBroker);
+                    ss.stop(localBroker);
                     // Release the started Latch since another thread could be
                     // stuck waiting for it to start up.
                     startedLatch.countDown();
@@ -393,16 +392,12 @@
                     ss.throwFirstException();
                 }
             }
-            if (wasDisposedAlready) {
-                LOG.debug(configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " stopped");
-            } else {
-                LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " stopped");
-            }
+            LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + "
stopped");
         }
     }
 
     public void serviceRemoteException(Throwable error) {
-        if (!disposed) {
+        if (!disposed.get()) {
             if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
                 LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
             } else {
@@ -419,7 +414,7 @@
     }
 
     protected void serviceRemoteCommand(Command command) {    	      	
-        if (!disposed) {
+        if (!disposed.get()) {
             try {
                 if (command.isMessageDispatch()) {
                     waitStarted();
@@ -606,7 +601,7 @@
     }
 
     public void serviceLocalException(Throwable error) {
-        if (!disposed) {
+        if (!disposed.get()) {
             LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a local error: " + error);
             LOG.debug("The local Exception was:" + error, error);
             ASYNC_TASKS.execute(new Runnable() {
@@ -652,7 +647,7 @@
     }
 
     protected void serviceLocalCommand(Command command) {
-        if (!disposed) {
+        if (!disposed.get()) {
             try {
                 if (command.isMessageDispatch()) {
                     enqueueCounter.incrementAndGet();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=808890&r1=808889&r2=808890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Fri Aug 28 14:16:29 2009
@@ -126,9 +126,9 @@
                 SslContext.setCurrentSslContext(null);
             }
             NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
-            bridges.put(uri, bridge);
             try {
                 bridge.start();
+                bridges.put(uri, bridge);
             } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=808890&r1=808889&r2=808890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Fri Aug 28 14:16:29 2009
@@ -161,7 +161,6 @@
 
                     event.connectTime = System.currentTimeMillis();
                     event.failed.set(false);
-
                     listener.onServiceAdd(event);
                 }
             });

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=808890&r1=808889&r2=808890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
Fri Aug 28 14:16:29 2009
@@ -34,6 +34,7 @@
 import org.jmock.integration.junit4.JMock;
 import org.jmock.integration.junit4.JUnit4Mockery;
 import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,16 +44,17 @@
 public class DiscoveryNetworkReconnectTest {
 
     private static final Log LOG = LogFactory.getLog(DiscoveryNetworkReconnectTest.class);
-
+    final int maxReconnects = 5;
     BrokerService brokerA, brokerB;
     Mockery context;
     ManagementContext managementContext;
     
     final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
-    final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=600";
+    final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=1000";
 
     private DiscoveryAgent agent;
-
+    SocketProxy proxy;
+    
     @Before
     public void setUp() throws Exception {
         context = new JUnit4Mockery() {{
@@ -64,22 +66,10 @@
         configure(brokerA);
         brokerA.addConnector("tcp://localhost:0");
         brokerA.start();
-    }
-
-    private void configure(BrokerService broker) {
-        broker.setPersistent(false);
-        broker.setUseJmx(true);      
-    }
-    
-    @Test
-    public void testReconnect() throws Exception {
-        final SocketProxy proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
-        
-        // control multicast publish advertise agent to inject proxy
-        agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
-        agent.registerService(proxy.getUrl().toString());
-        agent.start();
         
+        proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
+        //new SocketProxy(new URI("tcp://localhost:61617"));
+    
         managementContext = context.mock(ManagementContext.class);
         
         context.checking(new Expectations(){{
@@ -97,40 +87,70 @@
                     new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
             
             // due to reconnect we get two registrations
-            atLeast(2).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
+            atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)),
with(equal(
                     new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"

                             + proxy.getUrl().getPort()))));
         }});
-
+        
         brokerB = new BrokerService();
         brokerB.setManagementContext(managementContext);
         brokerB.setBrokerName("BrokerNC");
-        configure(brokerB);       
-        brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000&trace=true");
+        configure(brokerB);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerA.stop();
+        brokerB.stop();
+    }
+    
+    private void configure(BrokerService broker) {
+        broker.setPersistent(false);
+        broker.setUseJmx(true);      
+    }
+    
+    @Test
+    public void testMulicastReconnect() throws Exception {     
+        
+        // control multicast advertise agent to inject proxy
+        agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
+        agent.registerService(proxy.getUrl().toString());
+        agent.start();
+
+        brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
         brokerB.start();
+        doReconnect();
+    }
+    
+    
 
-        Wait.waitFor(new Wait.Condition() {
-            public boolean isSatisified() throws Exception {
-               return proxy.connections.size() == 1;
-            }
-        });
-       
-        // force an inactivity timeout timeout
-        proxy.pause();
-        
-        // wait for the inactivity timeout
-        Thread.sleep(2000);
-        
-        // let a reconnect succeed
-        proxy.goOn();
-        
-        assertTrue("got a reconnect", Wait.waitFor(new Wait.Condition() {
-            public boolean isSatisified() throws Exception {
-               return proxy.connections.size() == 1;
-            }
-        }));
+    @Test
+    public void testSimpleReconnect() throws Exception {
+        brokerB.addNetworkConnector("simple://(" + proxy.getUrl() 
+                + ")?useExponentialBackOff=false&initialReconnectDelay=500&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
+        brokerB.start();       
+        doReconnect();
+    }
+
+    private void doReconnect() throws Exception {
         
-        brokerB.stop();
-        // let mockery validate minimal duplicate mbean registrations
+        for (int i=0; i<maxReconnects; i++) {
+            // Wait for connection
+            assertTrue("we got a network connection in a timely manner", Wait.waitFor(new
Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                   return proxy.connections.size() == 1;
+                }
+            }));
+            Thread.sleep(1000);
+            
+            // force an inactivity timeout timeout
+            proxy.pause();
+        
+            // wait for the inactivity timeout
+            Thread.sleep(3000);
+        
+            // let a reconnect succeed
+            proxy.goOn();       
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java?rev=808890&r1=808889&r2=808890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java
Fri Aug 28 14:16:29 2009
@@ -51,6 +51,16 @@
         assertEquals(2, data.getComponents().length);
     }
 
+    
+    public void testCompositeWithComponentParam() throws Exception {
+        CompositeData data = URISupport.parseComposite(new URI("test:(part1://host?part1=true)?outside=true"));
+        assertEquals(1, data.getComponents().length);
+        assertEquals(1, data.getParameters().size());
+        Map part1Params = URISupport.parseParamters(data.getComponents()[0]);
+        assertEquals(1, part1Params.size());
+        assertTrue(part1Params.containsKey("part1"));
+    }
+    
     public void testParsingURI() throws Exception {
         URI source = new URI("tcp://localhost:61626/foo/bar?cheese=Edam&x=123");
         
@@ -70,7 +80,9 @@
     }
     
     public void testParsingCompositeURI() throws URISyntaxException {
-        URISupport.parseComposite(new URI("broker://(tcp://localhost:61616)?name=foo"));
+        CompositeData data = URISupport.parseComposite(new URI("broker://(tcp://localhost:61616)?name=foo"));
+        assertEquals("one component", 1, data.getComponents().length);
+        assertEquals("Size: " + data.getParameters(), 1, data.getParameters().size());
     }
     
     public void testCheckParenthesis() throws Exception {



Mime
View raw message