activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1075801 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/ test/java/org/apache/activemq/util/
Date Tue, 01 Mar 2011 12:59:09 GMT
Author: gtully
Date: Tue Mar  1 12:59:09 2011
New Revision: 1075801

URL: http://svn.apache.org/viewvc?rev=1075801&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3195 - NetworkConnector initialization should be
backed by an executor
added new broker attribute networkConnectorStartAsync (default false) that will cause network
connectors
to be started in seperate threads such that a block of one does not effect the rest

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1075801&r1=1075800&r2=1075801&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Mar  1 12:59:09 2011
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -194,8 +195,8 @@ public class BrokerService implements Se
     private boolean slave = true;
     private int schedulePeriodForDestinationPurge=5000;
     private BrokerContext brokerContext;
-    
-    
+    private boolean networkConnectorStartAsync = false;
+
 	static {
         String localHostName = "localhost";
         try {
@@ -2075,16 +2076,50 @@ public class BrokerService implements Se
                 waitForSlave();
             }
             if (!stopped.get()) {
+                ThreadPoolExecutor networkConnectorStartExecutor = null;
+                if (isNetworkConnectorStartAsync()) {
+                    // spin up as many threads as needed
+                    networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+                            10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+                            new ThreadFactory() {
+                                int count=0;
+                                public Thread newThread(Runnable runnable) {
+                                    Thread thread = new Thread(runnable, "NetworkConnector
Start Thread-" +(count++));
+                                    thread.setDaemon(true);
+                                    return thread;
+                                }
+                            });
+                }
+
                 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator();
iter.hasNext();) {
-                    NetworkConnector connector = iter.next();
+                    final NetworkConnector connector = iter.next();
                     connector.setLocalUri(uri);
                     connector.setBrokerName(getBrokerName());
                     connector.setDurableDestinations(durableDestinations);
                     if (getDefaultSocketURIString() != null) {
                         connector.setBrokerURL(getDefaultSocketURIString());
                     }
-                    connector.start();
+                    if (networkConnectorStartExecutor != null) {
+                        networkConnectorStartExecutor.execute(new Runnable() {
+                            public void run() {
+                                try {
+                                    LOG.info("Async start of " + connector);
+                                    connector.start();
+                                } catch(Exception e) {
+                                    LOG.error("Async start of network connector: " + connector
+ " failed", e);
+                                }
+                            }
+                        });
+                    } else {
+                        connector.start();
+                    }
+                }
+                if (networkConnectorStartExecutor != null) {
+                    // executor done when enqueued tasks are complete
+                    networkConnectorStartExecutor.shutdown();
+                    networkConnectorStartExecutor = null;
                 }
+
                 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator();
iter.hasNext();) {
                     ProxyConnector connector = iter.next();
                     connector.start();
@@ -2350,4 +2385,12 @@ public class BrokerService implements Se
     public void setUseAuthenticatedPrincipalForJMXUserID(boolean useAuthenticatedPrincipalForJMXUserID)
{
         this.useAuthenticatedPrincipalForJMXUserID = useAuthenticatedPrincipalForJMXUserID;
     }
+
+    public boolean isNetworkConnectorStartAsync() {
+        return networkConnectorStartAsync;
+    }
+
+    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
+        this.networkConnectorStartAsync = networkConnectorStartAsync;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1075801&r1=1075800&r2=1075801&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Tue Mar  1 12:59:09 2011
@@ -156,13 +156,24 @@ public class JmsMultipleBrokersTestSuppo
     protected void waitForBridgeFormation(final int min) throws Exception {
         for (BrokerItem brokerItem : brokers.values()) {
             final BrokerService broker = brokerItem.broker;
-            if (!broker.getNetworkConnectors().isEmpty()) {
-                Wait.waitFor(new Wait.Condition() {
-                    public boolean isSatisified() throws Exception {
-                        return (broker.getNetworkConnectors().get(0).activeBridges().size()
>= min);
-                    }}, Wait.MAX_WAIT_MILLIS * 2);
-            }
+            waitForBridgeFormation(broker, min, 0);
+        }
+    }
+
+    public boolean waitForBridgeFormation(final BrokerService broker, final int min, final
int bridgeIndex) throws Exception {
+        return waitForBridgeFormation(broker, min, bridgeIndex, Wait.MAX_WAIT_MILLIS*2);
+    }
+
+    public boolean waitForBridgeFormation(final BrokerService broker, final int min, final
int bridgeIndex, long wait) throws Exception {
+
+        boolean result = false;
+        if (!broker.getNetworkConnectors().isEmpty()) {
+            result = Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return (broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size()
>= min);
+                }}, wait);
         }
+        return result;
     }
 
     protected void waitForBridgeFormation() throws Exception {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java?rev=1075801&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
Tue Mar  1 12:59:09 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.SocketProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkAsyncStartTest extends JmsMultipleBrokersTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(NetworkAsyncStartTest.class);
+
+    private String brokerBUri = "tcp://localhost:61617";
+    private String brokerCUri = "tcp://localhost:61618";
+    int bridgeCount=0;
+
+    public void testAsyncNetworkStartup() throws Exception {
+
+        BrokerService brokerA = brokers.get("BrokerA").broker;
+        bridgeBroker(brokerA, brokerBUri);
+        bridgeBroker(brokerA, brokerCUri);
+
+        LOG.info("starting A, no blocking on failed network connectors");
+        brokerA.start();
+
+        LOG.info("starting C transport connector");
+        BrokerService brokerC = brokers.get("BrokerC").broker;
+        brokerC.addConnector(brokerCUri);
+        brokerC.start();
+
+        assertTrue("got bridge to C", waitForBridgeFormation(brokerA, 1, 1));
+        LOG.info("Got bridge A->C");
+
+        LOG.info("starting B transport connector");
+        BrokerService brokerB = brokers.get("BrokerB").broker;
+        brokerB.addConnector(brokerBUri);
+        brokerC.start();
+
+        assertTrue("got bridge to B", waitForBridgeFormation(brokerA, 1, 0));
+        assertTrue("got bridge to B&C", waitForBridgeFormation(brokerA, 1, 1));
+    }
+
+    public void testAsyncNetworkStartupWithSlowConnectionCreation() throws Exception {
+
+        final BrokerService brokerA = brokers.get("BrokerA").broker;
+
+        SocketProxy proxyToB = new SocketProxy();
+        // don't accept any connections so limited to one connection with backlog
+        proxyToB.setPauseAtStart(true);
+        proxyToB.setAcceptBacklog(1);
+        proxyToB.setTarget(new URI(brokerBUri));
+        proxyToB.open();
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, proxyToB.getUrl().toString());
+        bridgeBroker(brokerA, brokerCUri);
+
+        Executor e = Executors.newCachedThreadPool();
+        e.execute(new Runnable() {
+            public void run() {
+                LOG.info("starting A");
+                try {
+                    brokerA.setNetworkConnectorStartAsync(true);
+                    brokerA.start();
+                } catch (Exception e) {
+                    LOG.error("start failed", e);
+                }
+            }
+        });
+
+        LOG.info("starting transport connector on C");
+        BrokerService brokerC = brokers.get("BrokerC").broker;
+        brokerC.addConnector(brokerCUri);
+        brokerC.start();
+
+        final long maxWaitMillis = 20*1000;
+        assertTrue("got bridge to C in 10 seconds", waitForBridgeFormation(brokerA, 1, 7,
maxWaitMillis));
+    }
+
+    private void bridgeBroker(BrokerService localBroker, String remoteURI) throws Exception
{
+        String uri = "static:(" + remoteURI + ")";
+        NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+        connector.setName("bridge-" + bridgeCount++);
+        localBroker.addNetworkConnector(connector);
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        // initially with no tcp transport connector
+        createBroker(new URI("broker:()BrokerA?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:()BrokerB?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:()BrokerC?persistent=false&useJmx=false"));
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=1075801&r1=1075800&r2=1075801&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Tue
Mar  1 12:59:09 2011
@@ -56,6 +56,10 @@ public class SocketProxy {
 
     private int receiveBufferSize = -1;
 
+    private boolean pauseAtStart = false;
+
+    private int acceptBacklog = 50;
+
     public SocketProxy() throws Exception {    
     }
     
@@ -84,12 +88,15 @@ public class SocketProxy {
             serverSocket.setReceiveBufferSize(receiveBufferSize);
         }
         if (proxyUrl == null) {
-            serverSocket.bind(new InetSocketAddress(listenPort));
+            serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
             proxyUrl = urlFromSocket(target, serverSocket);
         } else {
             serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
         }
         acceptor = new Acceptor(serverSocket, target);
+        if (pauseAtStart) {
+            acceptor.pause();
+        }
         new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
         closed = new CountDownLatch(1);
     }
@@ -188,6 +195,22 @@ public class SocketProxy {
         }
     }
 
+    public boolean isPauseAtStart() {
+        return pauseAtStart;
+    }
+
+    public void setPauseAtStart(boolean pauseAtStart) {
+        this.pauseAtStart = pauseAtStart;
+    }
+
+    public int getAcceptBacklog() {
+        return acceptBacklog;
+    }
+
+    public void setAcceptBacklog(int acceptBacklog) {
+        this.acceptBacklog = acceptBacklog;
+    }
+
     private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
         int listenPort = serverSocket.getLocalPort();
 



Mime
View raw message