Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 2616 invoked from network); 6 May 2010 11:14:36 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 6 May 2010 11:14:36 -0000 Received: (qmail 50313 invoked by uid 500); 6 May 2010 11:14:36 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 50265 invoked by uid 500); 6 May 2010 11:14:35 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 50258 invoked by uid 99); 6 May 2010 11:14:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 May 2010 11:14:35 +0000 X-ASF-Spam-Status: No, hits=-1402.8 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 May 2010 11:14:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4499C23888E3; Thu, 6 May 2010 11:13:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r941661 - in /camel/trunk/components: camel-mina/src/main/java/org/apache/camel/component/mina/ camel-netty/src/main/java/org/apache/camel/component/netty/ camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ camel-netty/src... Date: Thu, 06 May 2010 11:13:42 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100506111342.4499C23888E3@eris.apache.org> Author: davsclaus Date: Thu May 6 11:13:41 2010 New Revision: 941661 URL: http://svn.apache.org/viewvc?rev=941661&view=rev Log: CAMEL-2699: Improve camel-netty to properly shutdown. Also add features which we have in camel-mina but wasnt ported to camel-netty yet. Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java (with props) Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java camel/trunk/components/camel-netty/src/test/resources/log4j.properties Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu May 6 11:13:41 2010 @@ -105,7 +105,12 @@ public class MinaConsumer extends Defaul if (endpoint.getConfiguration().getCharsetName() != null) { exchange.setProperty(Exchange.CHARSET_NAME, endpoint.getConfiguration().getCharsetName()); } - getProcessor().process(exchange); + + try { + getProcessor().process(exchange); + } catch (Throwable e) { + getExceptionHandler().handleException(e); + } // if sync then we should return a response if (sync) { Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java (original) +++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java Thu May 6 11:13:41 2010 @@ -39,7 +39,7 @@ public final class MinaHelper { * * @param session the MINA session * @param body the body to write (send) - * @param exchange the mina exchange used for error reporting + * @param exchange the exchange * @throws CamelExchangeException is thrown if the body could not be written for some reasons * (eg remote connection is closed etc.) */ @@ -48,6 +48,9 @@ public final class MinaHelper { WriteFuture future = session.write(body); // must use a timeout (we use 10s) as in some very high performance scenarios a write can cause // thread hanging forever + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for write to complete"); + } future.join(10 * 1000L); if (!future.isWritten()) { LOG.warn("Cannot write body: " + body + " using session: " + session); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 11:13:41 2010 @@ -57,6 +57,7 @@ public class NettyConfiguration { private int maxPoolSize; private String keyStoreFormat; private String securityProvider; + private boolean disconnect; public NettyConfiguration() { setKeepAlive(true); @@ -139,6 +140,9 @@ public class NettyConfiguration { if (settings.containsKey("maxPoolSize")) { setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize"))); } + if (settings.containsKey("disconnect")) { + setDisconnect(Boolean.valueOf((String) settings.get("disconnect"))); + } } public String getProtocol() { @@ -221,7 +225,6 @@ public class NettyConfiguration { this.sslHandler = sslHandler; } - public List getEncoders() { return encoders; } @@ -354,6 +357,18 @@ public class NettyConfiguration { this.securityProvider = securityProvider; } + public boolean isDisconnect() { + return disconnect; + } + + public void setDisconnect(boolean disconnect) { + this.disconnect = disconnect; + } + + public String getAddress() { + return host + ":" + port; + } + private void addToHandlersList(List configured, List handlers, Class handlerType) { if (handlers != null) { for (int x = 0; x < handlers.size(); x++) { Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java?rev=941661&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java Thu May 6 11:13:41 2010 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +/** + * Netty constants + * + * @version $Revision$ + */ +public final class NettyConstants { + + public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete"; + public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext"; + public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent"; + + private NettyConstants() { + // Utility class + } + +} Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May 6 11:13:41 2010 @@ -20,12 +20,14 @@ import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; @@ -39,12 +41,17 @@ public class NettyConsumer extends Defau private DatagramChannelFactory datagramChannelFactory; private ServerBootstrap serverBootstrap; private ConnectionlessBootstrap connectionlessServerBootstrap; - - public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, - NettyConfiguration configuration) { + private Channel channel; + + public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) { super(nettyEndpoint, processor); - this.configuration = nettyEndpoint.getConfiguration(); this.context = this.getEndpoint().getCamelContext(); + this.configuration = configuration; + } + + @Override + public NettyEndpoint getEndpoint() { + return (NettyEndpoint) super.getEndpoint(); } @Override @@ -55,45 +62,28 @@ public class NettyConsumer extends Defau } else { initializeTCPServerSocketCommunicationLayer(); } + + LOG.info("Netty consumer bound to: " + configuration.getAddress()); } @Override protected void doStop() throws Exception { - super.doStop(); - } - - private void initializeTCPServerSocketCommunicationLayer() throws Exception { - ExecutorService bossExecutor = - context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - ExecutorService workerExecutor = - context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); - serverBootstrap = new ServerBootstrap(channelFactory); - - serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this)); - serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); - serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); - serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); - serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - LOG.info("Netty TCP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort()); - } + if (LOG.isInfoEnabled()) { + LOG.info("Netty consumer unbinding from: " + configuration.getAddress()); + } - private void initializeUDPServerSocketCommunicationLayer() throws Exception { - ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize()); - datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); - connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); - - connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this)); - connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); - connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); - connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); - connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast()); - connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize()); - connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize()); - connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - LOG.info("Netty UDP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort()); + + if (channel != null) { + NettyHelper.close(channel); + } + + // TODO: use ChannelGroup to keep track on open connections etc to be closed on stopping + // and then releasing channel factory would be faster +// if (channelFactory != null) { +// channelFactory.releaseExternalResources(); +// } + + super.doStop(); } public NettyConfiguration getConfiguration() { @@ -112,13 +102,11 @@ public class NettyConsumer extends Defau this.channelFactory = channelFactory; } - public DatagramChannelFactory getDatagramChannelFactory() { return datagramChannelFactory; } - public void setDatagramChannelFactory( - DatagramChannelFactory datagramChannelFactory) { + public void setDatagramChannelFactory(DatagramChannelFactory datagramChannelFactory) { this.datagramChannelFactory = datagramChannelFactory; } @@ -134,9 +122,43 @@ public class NettyConsumer extends Defau return connectionlessServerBootstrap; } - public void setConnectionlessServerBootstrap( - ConnectionlessBootstrap connectionlessServerBootstrap) { + public void setConnectionlessServerBootstrap(ConnectionlessBootstrap connectionlessServerBootstrap) { this.connectionlessServerBootstrap = connectionlessServerBootstrap; } + private void initializeTCPServerSocketCommunicationLayer() throws Exception { + ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", + configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker", + configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + + channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); + serverBootstrap = new ServerBootstrap(channelFactory); + serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this)); + serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); + serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); + serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); + serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); + + channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + } + + private void initializeUDPServerSocketCommunicationLayer() throws Exception { + ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker", + configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + + datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); + connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); + connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this)); + connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); + connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); + connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); + connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); + connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast()); + connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize()); + connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize()); + + channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + } + } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Thu May 6 11:13:41 2010 @@ -43,13 +43,13 @@ public class NettyEndpoint extends Defau public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) { Exchange exchange = createExchange(); - exchange.getIn().setHeader("NettyChannelHandlerContext", ctx); - exchange.getIn().setHeader("NettyMessageEvent", messageEvent); + exchange.getIn().setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx); + exchange.getIn().setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent); return exchange; } public boolean isSingleton() { - return false; + return true; } public NettyConfiguration getConfiguration() { Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Thu May 6 11:13:41 2010 @@ -16,6 +16,8 @@ */ package org.apache.camel.component.netty; +import java.net.SocketAddress; + import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.commons.logging.Log; @@ -45,17 +47,34 @@ public final class NettyHelper { * @throws CamelExchangeException is thrown if the body could not be written for some reasons * (eg remote connection is closed etc.) */ - public static void writeBody(Channel channel, Object body, Exchange exchange) throws CamelExchangeException { + public static void writeBody(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException { // the write operation is asynchronous. Use future to wait until the session has been written - ChannelFuture future = channel.write(body); + ChannelFuture future; + if (remoteAddress != null) { + future = channel.write(body, remoteAddress); + } else { + future = channel.write(body); + } // wait for the write + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for write to complete"); + } future.awaitUninterruptibly(); // if it was not a success then thrown an exception if (future.isSuccess() == false) { LOG.warn("Cannot write body: " + body + " using channel: " + channel); - throw new CamelExchangeException("Cannot write body", exchange); + throw new CamelExchangeException("Cannot write body", exchange, future.getCause()); + } + } + + public static void close(Channel channel) { + if (channel != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Closing channel: " + channel); + } + channel.close().awaitUninterruptibly(); } } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 11:13:41 2010 @@ -22,11 +22,13 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; +import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.ServicePoolAware; import org.apache.camel.component.netty.handlers.ClientChannelHandler; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.bootstrap.ClientBootstrap; @@ -56,7 +58,19 @@ public class NettyProducer extends Defau super(nettyEndpoint); this.configuration = configuration; this.context = this.getEndpoint().getCamelContext(); - } + } + + @Override + public NettyEndpoint getEndpoint() { + return (NettyEndpoint) super.getEndpoint(); + } + + @Override + public boolean isSingleton() { + // the producer should not be singleton otherwise cannot use concurrent producers and safely + // use request/reply with correct correlation + return false; + } @Override protected void doStart() throws Exception { @@ -70,14 +84,17 @@ public class NettyProducer extends Defau @Override protected void doStop() throws Exception { - super.doStop(); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping producer at address: " + configuration.getAddress()); + } + if (channelFuture != null) { + NettyHelper.close(channelFuture.getChannel()); + } + if (channelFactory != null) { + channelFactory.releaseExternalResources(); + } - @Override - public boolean isSingleton() { - // the producer should not be singleton otherwise cannot use concurrent producers and safely - // use request/reply with correct correlation - return false; + super.doStop(); } public void process(Exchange exchange) throws Exception { @@ -87,7 +104,7 @@ public class NettyProducer extends Defau // write the body Channel channel = channelFuture.getChannel(); - NettyHelper.writeBody(channel, exchange.getIn().getBody(), exchange); + NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange); if (configuration.isSync()) { boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS); @@ -96,21 +113,35 @@ public class NettyProducer extends Defau } Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse(); exchange.getOut().setBody(response); - } + } + + // should channel be closed after complete? + Boolean close; + if (ExchangeHelper.isOutCapable(exchange)) { + close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } else { + close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } + + // should we disconnect, the header can override the configuration + boolean disconnect = getConfiguration().isDisconnect(); + if (close != null) { + disconnect = close; + } + if (disconnect) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing channel when complete at address: " + getEndpoint().getConfiguration().getAddress()); + } + NettyHelper.close(channel); + } } protected void setupTCPCommunication() throws Exception { if (channelFactory == null) { - ExecutorService bossExecutor = - context.getExecutorServiceStrategy().newThreadPool(this, - "NettyTCPBoss", - configuration.getCorePoolSize(), - configuration.getMaxPoolSize()); - ExecutorService workerExecutor = - context.getExecutorServiceStrategy().newThreadPool(this, - "NettyTCPWorker", - configuration.getCorePoolSize(), - configuration.getMaxPoolSize()); + ExecutorService bossExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", + configuration.getCorePoolSize(), configuration.getMaxPoolSize()); + ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker", + configuration.getCorePoolSize(), configuration.getMaxPoolSize()); channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); } if (clientBootstrap == null) { @@ -125,18 +156,20 @@ public class NettyProducer extends Defau clientPipeline = clientPipelineFactory.getPipeline(); clientBootstrap.setPipeline(clientPipeline); } - channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + + channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); channelFuture.awaitUninterruptibly(); - LOG.info("Netty TCP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort()); + if (!channelFuture.isSuccess()) { + throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); + } + + LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress()); } - + protected void setupUDPCommunication() throws Exception { if (datagramChannelFactory == null) { - ExecutorService workerExecutor = - context.getExecutorServiceStrategy().newThreadPool(this, - "NettyUDPWorker", - configuration.getCorePoolSize(), - configuration.getMaxPoolSize()); + ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker", + configuration.getCorePoolSize(), configuration.getMaxPoolSize()); datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); } if (connectionlessClientBootstrap == null) { @@ -155,12 +188,17 @@ public class NettyProducer extends Defau clientPipeline = clientPipelineFactory.getPipeline(); connectionlessClientBootstrap.setPipeline(clientPipeline); } + connectionlessClientBootstrap.bind(new InetSocketAddress(0)); - channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); channelFuture.awaitUninterruptibly(); - LOG.info("Netty UDP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort()); - } - + if (!channelFuture.isSuccess()) { + throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); + } + + LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress()); + } + public NettyConfiguration getConfiguration() { return configuration; } @@ -216,5 +254,5 @@ public class NettyProducer extends Defau public void setClientPipeline(ChannelPipeline clientPipeline) { this.clientPipeline = clientPipeline; } - + } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May 6 11:13:41 2010 @@ -16,6 +16,8 @@ */ package org.apache.camel.component.netty.handlers; +import org.apache.camel.CamelException; +import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.NettyProducer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,20 +39,25 @@ public class ClientChannelHandler extend } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("An exception was caught by the ClientChannelHandler during communication", exceptionEvent.getCause()); + LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); } + // close channel in case an exception was thrown + NettyHelper.close(exceptionEvent.getChannel()); + + // must wrap and rethrow since cause can be of Throwable and we must only throw Exception + throw new CamelException(exceptionEvent.getCause()); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) - throws Exception { - response = messageEvent.getMessage(); + public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { + setResponse(messageEvent.getMessage()); + if (LOG.isDebugEnabled()) { LOG.debug("Incoming message:" + response); } + if (producer.getConfiguration().isSync()) { producer.getCountdownLatch().countDown(); } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 11:13:41 2010 @@ -16,17 +16,15 @@ */ package org.apache.camel.component.netty.handlers; -import java.net.InetSocketAddress; - -import org.apache.camel.CamelExchangeException; +import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.component.netty.NettyConsumer; -import org.apache.camel.component.netty.NettyEndpoint; +import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ExceptionEvent; @@ -44,37 +42,55 @@ public class ServerChannelHandler extend } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("An exception was caught by the ServerChannelHandler during communication", exceptionEvent.getCause()); + LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); } + // close channel in case an exception was thrown + NettyHelper.close(exceptionEvent.getChannel()); + + // must wrap and rethrow since cause can be of Throwable and we must only throw Exception + throw new CamelException(exceptionEvent.getCause()); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) - throws Exception { + public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { Object in = messageEvent.getMessage(); if (LOG.isDebugEnabled()) { if (in instanceof byte[]) { + // byte arrays is not readable so convert to string in = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in); } LOG.debug("Incoming message: " + in); } - // Dispatch exchange along the route and receive the final resulting exchange - dispatchExchange(ctx, messageEvent, in); + // create Exchange and let the consumer process it + Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent); + if (consumer.getConfiguration().isSync()) { + exchange.setPattern(ExchangePattern.InOut); + } + exchange.getIn().setBody(in); + + try { + consumer.getProcessor().process(exchange); + } catch (Throwable e) { + consumer.getExceptionHandler().handleException(e); + } + + // send back response if the communication is synchronous + if (consumer.getConfiguration().isSync()) { + sendResponse(messageEvent, exchange); + } } - private void sendResponsetoChannel(MessageEvent messageEvent, Exchange exchange) throws Exception { - ChannelFuture future; + private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception { Object body; if (ExchangeHelper.isOutCapable(exchange)) { body = exchange.getOut().getBody(); } else { body = exchange.getIn().getBody(); } - + if (exchange.isFailed()) { if (exchange.getException() == null) { // fault detected @@ -83,54 +99,43 @@ public class ServerChannelHandler extend body = exchange.getException(); } } - + if (body == null) { - LOG.warn("No Oubound Response received following route completion: " + exchange); - LOG.warn("A response cannot be sent to the Client"); - messageEvent.getChannel().close(); - } - - if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) { - future = messageEvent.getChannel().write(body, messageEvent.getRemoteAddress()); + // must close session if no data to write otherwise client will never receive a response + // and wait forever (if not timing out) + LOG.warn("Cannot write body since its null, closing channel: " + exchange); + NettyHelper.close(messageEvent.getChannel()); } else { - future = messageEvent.getChannel().write(body); - } - - if (!future.isSuccess()) { - String hostname = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getHostName(); - int port = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getPort(); - throw new CamelExchangeException("Could not send response via Channel to remote host " + hostname + " and port " + port, exchange); - } - - if (LOG.isDebugEnabled()) { - if (body instanceof byte[]) { - body = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body); + // we got a body to write + if (LOG.isDebugEnabled()) { + LOG.debug("Writing body" + body); + } + if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) { + NettyHelper.writeBody(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange); + } else { + NettyHelper.writeBody(messageEvent.getChannel(), null, body, exchange); } - LOG.debug("Sent Outgoing message: " + body); - } - } - - private void dispatchExchange(ChannelHandlerContext ctx, MessageEvent messageEvent, Object in) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Consumer Dispatching the Incoming exchange along the route"); } - Exchange exchange = ((NettyEndpoint)consumer.getEndpoint()).createExchange(ctx, messageEvent); - if (consumer.getConfiguration().isSync()) { - exchange.setPattern(ExchangePattern.InOut); + // should channel be closed after complete? + Boolean close; + if (ExchangeHelper.isOutCapable(exchange)) { + close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } else { + close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); } - exchange.getIn().setBody(in); - - try { - consumer.getProcessor().process(exchange); - } catch (Exception exception) { - throw new CamelExchangeException("Error in consumer while dispatching exchange for further processing", exchange); + + // should we disconnect, the header can override the configuration + boolean disconnect = consumer.getConfiguration().isDisconnect(); + if (close != null) { + disconnect = close; } - - // Send back response if the communication is synchronous - if (consumer.getConfiguration().isSync()) { - sendResponsetoChannel(messageEvent, exchange); + if (disconnect) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing channel when complete at address: " + messageEvent.getRemoteAddress()); + } + NettyHelper.close(messageEvent.getChannel()); } } - + } Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java?rev=941661&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java Thu May 6 11:13:41 2010 @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class NettyDisconnectTest extends CamelTestSupport { + + private String uri = "netty:tcp://localhost:8080?sync=true&disconnect=true"; + + @Test + public void testCloseSessionWhenComplete() throws Exception { + Object out = template.requestBody(uri, "Claus"); + assertEquals("Bye Claus", out); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from(uri).process(new Processor() { + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + exchange.getOut().setBody("Bye " + body); + } + }); + } + }; + } + +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyDisconnectTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java?rev=941661&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java Thu May 6 11:13:41 2010 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class NettyInOutCloseChannelWhenCompleteTest extends CamelTestSupport { + + @Test + public void testCloseSessionWhenComplete() throws Exception { + Object out = template.requestBody("netty:tcp://localhost:8080?sync=true", "Claus"); + assertEquals("Bye Claus", out); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("netty:tcp://localhost:8080?sync=true").process(new Processor() { + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + exchange.getOut().setBody("Bye " + body); + exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true); + } + }); + } + }; + } + +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutCloseChannelWhenCompleteTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=941661&r1=941660&r2=941661&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/resources/log4j.properties (original) +++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Thu May 6 11:13:41 2010 @@ -18,12 +18,12 @@ # # The logging properties used for eclipse testing, We want to see debug output on the console. # -log4j.rootLogger=DEBUG, file +log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging log4j.logger.org.apache.camel.component.netty=DEBUG -log4j.logger.org.apache.camel=DEBUG -log4j.logger.org.apache.commons.net=TRACE +#log4j.logger.org.apache.camel=DEBUG +#log4j.logger.org.apache.commons.net=TRACE # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender