Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 47580 invoked from network); 1 Mar 2011 12:59:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 1 Mar 2011 12:59:34 -0000 Received: (qmail 13316 invoked by uid 500); 1 Mar 2011 12:59:34 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13248 invoked by uid 500); 1 Mar 2011 12:59:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13241 invoked by uid 99); 1 Mar 2011 12:59:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Mar 2011 12:59:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Mar 2011 12:59:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BD46023888E7; Tue, 1 Mar 2011 12:59:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1075801 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/ test/java/org/apache/activemq/util/ Date: Tue, 01 Mar 2011 12:59:09 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110301125909.BD46023888E7@eris.apache.org> Author: gtully Date: Tue Mar 1 12:59:09 2011 New Revision: 1075801 URL: http://svn.apache.org/viewvc?rev=1075801&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3195 - NetworkConnector initialization should be backed by an executor added new broker attribute networkConnectorStartAsync (default false) that will cause network connectors to be started in seperate threads such that a block of one does not effect the rest Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1075801&r1=1075800&r2=1075801&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Mar 1 12:59:09 2011 @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -194,8 +195,8 @@ public class BrokerService implements Se private boolean slave = true; private int schedulePeriodForDestinationPurge=5000; private BrokerContext brokerContext; - - + private boolean networkConnectorStartAsync = false; + static { String localHostName = "localhost"; try { @@ -2075,16 +2076,50 @@ public class BrokerService implements Se waitForSlave(); } if (!stopped.get()) { + ThreadPoolExecutor networkConnectorStartExecutor = null; + if (isNetworkConnectorStartAsync()) { + // spin up as many threads as needed + networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 10, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactory() { + int count=0; + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); + thread.setDaemon(true); + return thread; + } + }); + } + for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { - NetworkConnector connector = iter.next(); + final NetworkConnector connector = iter.next(); connector.setLocalUri(uri); connector.setBrokerName(getBrokerName()); connector.setDurableDestinations(durableDestinations); if (getDefaultSocketURIString() != null) { connector.setBrokerURL(getDefaultSocketURIString()); } - connector.start(); + if (networkConnectorStartExecutor != null) { + networkConnectorStartExecutor.execute(new Runnable() { + public void run() { + try { + LOG.info("Async start of " + connector); + connector.start(); + } catch(Exception e) { + LOG.error("Async start of network connector: " + connector + " failed", e); + } + } + }); + } else { + connector.start(); + } + } + if (networkConnectorStartExecutor != null) { + // executor done when enqueued tasks are complete + networkConnectorStartExecutor.shutdown(); + networkConnectorStartExecutor = null; } + for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { ProxyConnector connector = iter.next(); connector.start(); @@ -2350,4 +2385,12 @@ public class BrokerService implements Se public void setUseAuthenticatedPrincipalForJMXUserID(boolean useAuthenticatedPrincipalForJMXUserID) { this.useAuthenticatedPrincipalForJMXUserID = useAuthenticatedPrincipalForJMXUserID; } + + public boolean isNetworkConnectorStartAsync() { + return networkConnectorStartAsync; + } + + public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { + this.networkConnectorStartAsync = networkConnectorStartAsync; + } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1075801&r1=1075800&r2=1075801&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Tue Mar 1 12:59:09 2011 @@ -156,13 +156,24 @@ public class JmsMultipleBrokersTestSuppo protected void waitForBridgeFormation(final int min) throws Exception { for (BrokerItem brokerItem : brokers.values()) { final BrokerService broker = brokerItem.broker; - if (!broker.getNetworkConnectors().isEmpty()) { - Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - return (broker.getNetworkConnectors().get(0).activeBridges().size() >= min); - }}, Wait.MAX_WAIT_MILLIS * 2); - } + waitForBridgeFormation(broker, min, 0); + } + } + + public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex) throws Exception { + return waitForBridgeFormation(broker, min, bridgeIndex, Wait.MAX_WAIT_MILLIS*2); + } + + public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex, long wait) throws Exception { + + boolean result = false; + if (!broker.getNetworkConnectors().isEmpty()) { + result = Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return (broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size() >= min); + }}, wait); } + return result; } protected void waitForBridgeFormation() throws Exception { Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java?rev=1075801&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java Tue Mar 1 12:59:09 2011 @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.SocketProxy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetworkAsyncStartTest extends JmsMultipleBrokersTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(NetworkAsyncStartTest.class); + + private String brokerBUri = "tcp://localhost:61617"; + private String brokerCUri = "tcp://localhost:61618"; + int bridgeCount=0; + + public void testAsyncNetworkStartup() throws Exception { + + BrokerService brokerA = brokers.get("BrokerA").broker; + bridgeBroker(brokerA, brokerBUri); + bridgeBroker(brokerA, brokerCUri); + + LOG.info("starting A, no blocking on failed network connectors"); + brokerA.start(); + + LOG.info("starting C transport connector"); + BrokerService brokerC = brokers.get("BrokerC").broker; + brokerC.addConnector(brokerCUri); + brokerC.start(); + + assertTrue("got bridge to C", waitForBridgeFormation(brokerA, 1, 1)); + LOG.info("Got bridge A->C"); + + LOG.info("starting B transport connector"); + BrokerService brokerB = brokers.get("BrokerB").broker; + brokerB.addConnector(brokerBUri); + brokerC.start(); + + assertTrue("got bridge to B", waitForBridgeFormation(brokerA, 1, 0)); + assertTrue("got bridge to B&C", waitForBridgeFormation(brokerA, 1, 1)); + } + + public void testAsyncNetworkStartupWithSlowConnectionCreation() throws Exception { + + final BrokerService brokerA = brokers.get("BrokerA").broker; + + SocketProxy proxyToB = new SocketProxy(); + // don't accept any connections so limited to one connection with backlog + proxyToB.setPauseAtStart(true); + proxyToB.setAcceptBacklog(1); + proxyToB.setTarget(new URI(brokerBUri)); + proxyToB.open(); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, proxyToB.getUrl().toString()); + bridgeBroker(brokerA, brokerCUri); + + Executor e = Executors.newCachedThreadPool(); + e.execute(new Runnable() { + public void run() { + LOG.info("starting A"); + try { + brokerA.setNetworkConnectorStartAsync(true); + brokerA.start(); + } catch (Exception e) { + LOG.error("start failed", e); + } + } + }); + + LOG.info("starting transport connector on C"); + BrokerService brokerC = brokers.get("BrokerC").broker; + brokerC.addConnector(brokerCUri); + brokerC.start(); + + final long maxWaitMillis = 20*1000; + assertTrue("got bridge to C in 10 seconds", waitForBridgeFormation(brokerA, 1, 7, maxWaitMillis)); + } + + private void bridgeBroker(BrokerService localBroker, String remoteURI) throws Exception { + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)); + connector.setName("bridge-" + bridgeCount++); + localBroker.addNetworkConnector(connector); + } + + @Override + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + // initially with no tcp transport connector + createBroker(new URI("broker:()BrokerA?persistent=false&useJmx=false")); + createBroker(new URI("broker:()BrokerB?persistent=false&useJmx=false")); + createBroker(new URI("broker:()BrokerC?persistent=false&useJmx=false")); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkAsyncStartTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=1075801&r1=1075800&r2=1075801&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Tue Mar 1 12:59:09 2011 @@ -56,6 +56,10 @@ public class SocketProxy { private int receiveBufferSize = -1; + private boolean pauseAtStart = false; + + private int acceptBacklog = 50; + public SocketProxy() throws Exception { } @@ -84,12 +88,15 @@ public class SocketProxy { serverSocket.setReceiveBufferSize(receiveBufferSize); } if (proxyUrl == null) { - serverSocket.bind(new InetSocketAddress(listenPort)); + serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog); proxyUrl = urlFromSocket(target, serverSocket); } else { serverSocket.bind(new InetSocketAddress(proxyUrl.getPort())); } acceptor = new Acceptor(serverSocket, target); + if (pauseAtStart) { + acceptor.pause(); + } new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start(); closed = new CountDownLatch(1); } @@ -188,6 +195,22 @@ public class SocketProxy { } } + public boolean isPauseAtStart() { + return pauseAtStart; + } + + public void setPauseAtStart(boolean pauseAtStart) { + this.pauseAtStart = pauseAtStart; + } + + public int getAcceptBacklog() { + return acceptBacklog; + } + + public void setAcceptBacklog(int acceptBacklog) { + this.acceptBacklog = acceptBacklog; + } + private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception { int listenPort = serverSocket.getLocalPort();