Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A79B9637 for ; Sun, 8 Apr 2012 12:52:41 +0000 (UTC) Received: (qmail 38646 invoked by uid 500); 8 Apr 2012 12:52:41 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 38612 invoked by uid 500); 8 Apr 2012 12:52:41 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 38605 invoked by uid 99); 8 Apr 2012 12:52:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 Apr 2012 12:52:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 Apr 2012 12:52:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0767E2388980; Sun, 8 Apr 2012 12:52:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1310991 - in /camel/branches/camel-2.9.x: ./ components/camel-netty/src/main/java/org/apache/camel/component/netty/ components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ components/camel-netty/src/test/java/org/ap... Date: Sun, 08 Apr 2012 12:52:18 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120408125219.0767E2388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Sun Apr 8 12:52:17 2012 New Revision: 1310991 URL: http://svn.apache.org/viewvc?rev=1310991&view=rev Log: CAMEL-5150: Some cleanup in camel-netty according to Netty docs. Added: camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java - copied unchanged from r1310979, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1310979 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1310991&r1=1310990&r2=1310991&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Sun Apr 8 12:52:17 2012 @@ -68,8 +68,6 @@ public class NettyConfiguration implemen private long sendBufferSize = 65536; private long receiveBufferSize = 65536; private int receiveBufferSizePredictor; - private int corePoolSize = 10; - private int maxPoolSize = 100; private int workerCount; private String keyStoreFormat; private String securityProvider; @@ -385,22 +383,6 @@ public class NettyConfiguration implemen this.trustStoreFile = trustStoreFile; } - public int getCorePoolSize() { - return corePoolSize; - } - - public void setCorePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - } - - public int getMaxPoolSize() { - return maxPoolSize; - } - - public void setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - } - public String getKeyStoreFormat() { return keyStoreFormat; } Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310991&r1=1310990&r2=1310991&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Sun Apr 8 12:52:17 2012 @@ -46,12 +46,14 @@ public class NettyConsumer extends Defau private ServerBootstrap serverBootstrap; private ConnectionlessBootstrap connectionlessServerBootstrap; private Channel channel; + private ExecutorService bossExecutor; + private ExecutorService workerExecutor; public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) { super(nettyEndpoint, processor); this.context = this.getEndpoint().getCamelContext(); this.configuration = configuration; - this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri()); + this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri()); } @Override @@ -78,14 +80,23 @@ public class NettyConsumer extends Defau LOG.debug("Netty consumer unbinding from: {}", configuration.getAddress()); // close all channels + LOG.trace("Closing {} channels", allChannels.size()); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); - // and then release other resources + // close server external resources if (channelFactory != null) { channelFactory.releaseExternalResources(); } + // and then shutdown the thread pools + if (bossExecutor != null) { + context.getExecutorServiceManager().shutdownNow(bossExecutor); + } + if (workerExecutor != null) { + context.getExecutorServiceManager().shutdownNow(workerExecutor); + } + super.doStop(); LOG.info("Netty consumer unbound from: " + configuration.getAddress()); @@ -144,12 +155,10 @@ public class NettyConsumer extends Defau } private void initializeTCPServerSocketCommunicationLayer() throws Exception { - ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); - if (configuration.getWorkerCount() == 0) { + if (configuration.getWorkerCount() <= 0) { channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); } else { channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, @@ -164,6 +173,7 @@ public class NettyConsumer extends Defau } serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); + serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); @@ -173,10 +183,12 @@ public class NettyConsumer extends Defau } private void initializeUDPServerSocketCommunicationLayer() throws Exception { - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); + if (configuration.getWorkerCount() <= 0) { + datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); + } else { + datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount()); + } connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); if (configuration.getServerPipelineFactory() != null) { configuration.getServerPipelineFactory().setConsumer(this); @@ -186,6 +198,7 @@ public class NettyConsumer extends Defau } connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); + connectionlessServerBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast()); Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310991&r1=1310990&r2=1310991&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sun Apr 8 12:52:17 2012 @@ -55,6 +55,8 @@ public class NettyProducer extends Defau private ChannelFactory channelFactory; private DatagramChannelFactory datagramChannelFactory; private CamelLogger noReplyLogger; + private ExecutorService bossExecutor; + private ExecutorService workerExecutor; public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) { super(nettyEndpoint); @@ -103,6 +105,7 @@ public class NettyProducer extends Defau protected void doStop() throws Exception { LOG.debug("Stopping producer at address: {}", configuration.getAddress()); // close all channels + LOG.trace("Closing {} channels", ALL_CHANNELS.size()); ChannelGroupFuture future = ALL_CHANNELS.close(); future.awaitUninterruptibly(); @@ -110,6 +113,15 @@ public class NettyProducer extends Defau if (channelFactory != null) { channelFactory.releaseExternalResources(); } + + // and then shutdown the thread pools + if (bossExecutor != null) { + context.getExecutorServiceManager().shutdownNow(bossExecutor); + } + if (workerExecutor != null) { + context.getExecutorServiceManager().shutdownNow(workerExecutor); + } + super.doStop(); } @@ -208,18 +220,15 @@ public class NettyProducer extends Defau protected void setupTCPCommunication() throws Exception { if (channelFactory == null) { - ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); } } protected void setupUDPCommunication() throws Exception { if (datagramChannelFactory == null) { - ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker", - configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); } } @@ -243,16 +252,17 @@ public class NettyProducer extends Defau if (isTcp()) { ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory); - clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); - clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); - clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); + clientBootstrap.setOption("keepAlive", configuration.isKeepAlive()); + clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay()); + clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); + clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout()); // set the pipeline on the bootstrap clientBootstrap.setPipeline(clientPipeline); answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); return answer; } else { + // TODO: Is this correct for a UDP client ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310991&r1=1310990&r2=1310991&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Sun Apr 8 12:52:17 2012 @@ -42,7 +42,7 @@ public class ClientChannelHandler extend private final Exchange exchange; private final AsyncCallback callback; private boolean messageReceived; - private boolean exceptionHandled; + private volatile boolean exceptionHandled; public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback callback) { this.producer = producer; Modified: camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties?rev=1310991&r1=1310990&r2=1310991&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties (original) +++ camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties Sun Apr 8 12:52:17 2012 @@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging #log4j.logger.org.apache.camel.component.netty=TRACE #log4j.logger.org.apache.camel=DEBUG -#log4j.logger.org.apache.commons.net=TRACE +#log4j.logger.org.jboss.netty=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender