Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 26026 invoked from network); 6 May 2010 12:41:06 -0000 Received: from unknown (HELO mail.apache.org) (::) by ::ffff:140.211.11.9 with SMTP; 6 May 2010 12:41:06 -0000 Received: (qmail 76383 invoked by uid 500); 6 May 2010 12:41:06 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 76354 invoked by uid 500); 6 May 2010 12:41:06 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 76342 invoked by uid 99); 6 May 2010 12:41:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 May 2010 12:41:06 +0000 X-ASF-Spam-Status: No, hits=-1404.7 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 May 2010 12:41:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id AF95123888E4; Thu, 6 May 2010 12:40:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r941699 - in /camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty: NettyConsumer.java NettyProducer.java handlers/ClientChannelHandler.java handlers/ServerChannelHandler.java Date: Thu, 06 May 2010 12:40:14 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100506124014.AF95123888E4@eris.apache.org> Author: davsclaus Date: Thu May 6 12:40:14 2010 New Revision: 941699 URL: http://svn.apache.org/viewvc?rev=941699&view=rev Log: CAMEL-2699: camel-netty now shutdown properly. Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941699&r1=941698&r2=941699&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May 6 12:40:14 2010 @@ -20,7 +20,6 @@ import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.commons.logging.Log; @@ -29,12 +28,16 @@ import org.jboss.netty.bootstrap.Connect import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class NettyConsumer extends DefaultConsumer { private static final transient Log LOG = LogFactory.getLog(NettyConsumer.class); + private final ChannelGroup allChannels; private CamelContext context; private NettyConfiguration configuration; private ChannelFactory channelFactory; @@ -47,6 +50,7 @@ public class NettyConsumer extends Defau super(nettyEndpoint, processor); this.context = this.getEndpoint().getCamelContext(); this.configuration = configuration; + this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri()); } @Override @@ -72,20 +76,22 @@ public class NettyConsumer extends Defau LOG.info("Netty consumer unbinding from: " + configuration.getAddress()); } - - if (channel != null) { - NettyHelper.close(channel); + // close all channels + ChannelGroupFuture future = allChannels.close(); + future.awaitUninterruptibly(); + + // and then release other resources + if (channelFactory != null) { + channelFactory.releaseExternalResources(); } - // TODO: use ChannelGroup to keep track on open connections etc to be closed on stopping - // and then releasing channel factory would be faster -// if (channelFactory != null) { -// channelFactory.releaseExternalResources(); -// } - super.doStop(); } - + + public ChannelGroup getAllChannels() { + return allChannels; + } + public NettyConfiguration getConfiguration() { return configuration; } @@ -108,8 +114,8 @@ public class NettyConsumer extends Defau public void setDatagramChannelFactory(DatagramChannelFactory datagramChannelFactory) { this.datagramChannelFactory = datagramChannelFactory; - } - + } + public ServerBootstrap getServerBootstrap() { return serverBootstrap; } @@ -124,8 +130,8 @@ public class NettyConsumer extends Defau public void setConnectionlessServerBootstrap(ConnectionlessBootstrap connectionlessServerBootstrap) { this.connectionlessServerBootstrap = connectionlessServerBootstrap; - } - + } + private void initializeTCPServerSocketCommunicationLayer() throws Exception { ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", configuration.getCorePoolSize(), configuration.getMaxPoolSize()); @@ -141,6 +147,8 @@ public class NettyConsumer extends Defau serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + // to keep track of all channels in use + allChannels.add(channel); } private void initializeUDPServerSocketCommunicationLayer() throws Exception { @@ -159,6 +167,8 @@ public class NettyConsumer extends Defau connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize()); channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + // to keep track of all channels in use + allChannels.add(channel); } } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941699&r1=941698&r2=941699&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 12:40:14 2010 @@ -37,18 +37,23 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; public class NettyProducer extends DefaultProducer implements ServicePoolAware { private static final transient Log LOG = LogFactory.getLog(NettyProducer.class); + private final ChannelGroup allChannels; private CamelContext context; private NettyConfiguration configuration; private CountDownLatch countdownLatch; private ChannelFactory channelFactory; private DatagramChannelFactory datagramChannelFactory; private ChannelFuture channelFuture; + private Channel channel; private ClientBootstrap clientBootstrap; private ConnectionlessBootstrap connectionlessClientBootstrap; private ClientPipelineFactory clientPipelineFactory; @@ -58,6 +63,7 @@ public class NettyProducer extends Defau super(nettyEndpoint); this.configuration = configuration; this.context = this.getEndpoint().getCamelContext(); + this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri()); } @Override @@ -87,9 +93,12 @@ public class NettyProducer extends Defau if (LOG.isDebugEnabled()) { LOG.debug("Stopping producer at address: " + configuration.getAddress()); } - if (channelFuture != null) { - NettyHelper.close(channelFuture.getChannel()); - } + + // close all channels + ChannelGroupFuture future = allChannels.close(); + future.awaitUninterruptibly(); + + // and then release other resources if (channelFactory != null) { channelFactory.releaseExternalResources(); } @@ -103,7 +112,6 @@ public class NettyProducer extends Defau } // write the body - Channel channel = channelFuture.getChannel(); NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange); if (configuration.isSync()) { @@ -162,6 +170,9 @@ public class NettyProducer extends Defau if (!channelFuture.isSuccess()) { throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); } + channel = channelFuture.getChannel(); + // to keep track of all channels in use + allChannels.add(channel); LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress()); } @@ -195,6 +206,9 @@ public class NettyProducer extends Defau if (!channelFuture.isSuccess()) { throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); } + channel = channelFuture.getChannel(); + // to keep track of all channels in use + allChannels.add(channel); LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress()); } @@ -255,4 +269,7 @@ public class NettyProducer extends Defau this.clientPipeline = clientPipeline; } + public ChannelGroup getAllChannels() { + return allChannels; + } } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941699&r1=941698&r2=941699&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May 6 12:40:14 2010 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; @@ -39,6 +40,12 @@ public class ClientChannelHandler extend } @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception { + // to keep track of open sockets + producer.getAllChannels().add(channelStateEvent.getChannel()); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941699&r1=941698&r2=941699&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 12:40:14 2010 @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; @@ -42,6 +43,12 @@ public class ServerChannelHandler extend } @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception { + // to keep track of open sockets + consumer.getAllChannels().add(channelStateEvent.getChannel()); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); @@ -108,7 +115,7 @@ public class ServerChannelHandler extend } else { // we got a body to write if (LOG.isDebugEnabled()) { - LOG.debug("Writing body" + body); + LOG.debug("Writing body: " + body); } if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) { NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange);