camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [02/26] git commit: CAMEL-6555 Updated the handlers to netty4 API
Date Tue, 22 Jul 2014 13:52:50 GMT
CAMEL-6555 Updated the handlers to netty4 API


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/08630d21
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/08630d21
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/08630d21

Branch: refs/heads/master
Commit: 08630d219384d8d56cf1ffe7cf97bbba27e6caae
Parents: 28551f2
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Thu Jul 17 17:01:18 2014 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Tue Jul 22 21:25:17 2014 +0800

----------------------------------------------------------------------
 .../netty4/DefaultClientPipelineFactory.java    |  2 +-
 .../netty4/DefaultServerPipelineFactory.java    | 21 +++---
 .../component/netty4/NettyConfiguration.java    | 13 ++--
 .../camel/component/netty4/NettyConverter.java  | 35 +++++-----
 .../camel/component/netty4/NettyEndpoint.java   | 23 ++++---
 .../camel/component/netty4/NettyHelper.java     |  9 ++-
 .../camel/component/netty4/NettyProducer.java   |  2 +-
 .../component/netty4/ServerPipelineFactory.java |  8 ++-
 .../netty4/ShareableChannelHandlerFactory.java  | 19 ++++++
 .../netty4/handlers/ClientChannelHandler.java   | 57 ++++++++--------
 .../netty4/handlers/ServerChannelHandler.java   | 72 ++++++++++----------
 .../handlers/ServerResponseFutureListener.java  |  9 +--
 12 files changed, 151 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
index cb53264..0c0f4d6 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
@@ -86,7 +86,7 @@ public class DefaultClientPipelineFactory extends ClientPipelineFactory
 {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
             }
-            ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(),
TimeUnit.MILLISECONDS);
+            ChannelHandler timeout = new ReadTimeoutHandler(producer.getConfiguration().getRequestTimeout(),
TimeUnit.MILLISECONDS);
             addToPipeline("timeout", channelPipeline, timeout);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
index e09a5b6..f981fce 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
@@ -16,20 +16,21 @@
  */
 package org.apache.camel.component.netty4;
 
+import java.nio.channels.Channels;
 import java.util.List;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.netty4.handlers.ServerChannelHandler;
-import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
 import org.apache.camel.util.ObjectHelper;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.Channels;
-import io.netty.handler.execution.ExecutionHandler;
 import io.netty.handler.ssl.SslHandler;
+import org.apache.camel.component.netty4.handlers.ServerChannelHandler;
+import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,13 +68,14 @@ public class DefaultServerPipelineFactory extends ServerPipelineFactory
{
     }
 
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline channelPipeline = Channels.pipeline();
+    protected void initChannel(Channel ch) throws Exception {
+        // create a new pipeline
+        ChannelPipeline channelPipeline = ch.pipeline();
 
         SslHandler sslHandler = configureServerSSLOnDemand();
         if (sslHandler != null) {
-            // must close on SSL exception
-            sslHandler.setCloseOnSSLException(true);
+            //TODO  must close on SSL exception
+            //sslHandler.setCloseOnSSLException(true);
             LOG.debug("Server SSL handler configured and added as an interceptor against
the ChannelPipeline: {}", sslHandler);
             addToPipeline("ssl", channelPipeline, sslHandler);
         }
@@ -112,7 +114,7 @@ public class DefaultServerPipelineFactory extends ServerPipelineFactory
{
         addToPipeline("handler", channelPipeline, new ServerChannelHandler(consumer));
 
         LOG.trace("Created ChannelPipeline: {}", channelPipeline);
-        return channelPipeline;
+
     }
 
     private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler)
{
@@ -184,4 +186,5 @@ public class DefaultServerPipelineFactory extends ServerPipelineFactory
{
     public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) {
         return new DefaultServerPipelineFactory(consumer);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index a52fd79..639ec88 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty4;
 
+
 import java.io.File;
 import java.net.URI;
 import java.nio.charset.Charset;
@@ -23,6 +24,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.Delimiters;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.UriParam;
@@ -30,11 +36,6 @@ import org.apache.camel.spi.UriParams;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
-import io.netty.buffer.ChannelBuffer;
-import io.netty.channel.ChannelHandler;
-import io.netty.handler.codec.frame.Delimiters;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,7 +192,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration
implem
                 if (isTextline()) {
                     Charset charset = getEncoding() != null ? Charset.forName(getEncoding())
: CharsetUtil.UTF_8;
                     encoders.add(ChannelHandlerFactories.newStringEncoder(charset));
-                    ChannelBuffer[] delimiters = delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter()
: Delimiters.nulDelimiter();
+                    ByteBuf[] delimiters = delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter()
: Delimiters.nulDelimiter();
                     decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength,
delimiters));
                     decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
 

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
index 62cc3a2..6800209 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.io.UnsupportedEncodingException;
+
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stax.StAXSource;
@@ -28,11 +29,13 @@ import javax.xml.transform.stream.StreamSource;
 
 import org.w3c.dom.Document;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
-import io.netty.buffer.ChannelBuffer;
-import io.netty.buffer.ChannelBufferInputStream;
-import io.netty.buffer.ChannelBuffers;
+
 
 /**
  * A set of converter methods for working with Netty types
@@ -47,12 +50,12 @@ public final class NettyConverter {
     }
 
     @Converter
-    public static byte[] toByteArray(ChannelBuffer buffer, Exchange exchange) {
+    public static byte[] toByteArray(ByteBuf buffer, Exchange exchange) {
         return buffer.array();
     }
 
     @Converter
-    public static String toString(ChannelBuffer buffer, Exchange exchange) throws UnsupportedEncodingException
{
+    public static String toString(ByteBuf buffer, Exchange exchange) throws UnsupportedEncodingException
{
         byte[] bytes = toByteArray(buffer, exchange);
         // use type converter as it can handle encoding set on the Exchange
         if (exchange != null) {
@@ -62,25 +65,25 @@ public final class NettyConverter {
     }
 
     @Converter
-    public static InputStream toInputStream(ChannelBuffer buffer, Exchange exchange) {
-        return new ChannelBufferInputStream(buffer);
+    public static InputStream toInputStream(ByteBuf buffer, Exchange exchange) {
+        return new ByteBufInputStream(buffer);
     }
 
     @Converter
-    public static ObjectInput toObjectInput(ChannelBuffer buffer, Exchange exchange) throws
IOException {
+    public static ObjectInput toObjectInput(ByteBuf buffer, Exchange exchange) throws IOException
{
         InputStream is = toInputStream(buffer, exchange);
         return new ObjectInputStream(is);
     }
 
     @Converter
-    public static ChannelBuffer toByteBuffer(byte[] bytes, Exchange exchange) {
-        ChannelBuffer buf = ChannelBuffers.dynamicBuffer(bytes.length);
+    public static ByteBuf toByteBuffer(byte[] bytes, Exchange exchange) {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
         buf.writeBytes(bytes);
         return buf;
     }
 
     @Converter
-    public static ChannelBuffer toByteBuffer(String s, Exchange exchange) {
+    public static ByteBuf toByteBuffer(String s, Exchange exchange) {
         byte[] bytes;
         if (exchange != null) {
             // use type converter as it can handle encoding set on the Exchange
@@ -92,31 +95,31 @@ public final class NettyConverter {
     }
 
     @Converter
-    public static Document toDocument(ChannelBuffer buffer, Exchange exchange) {
+    public static Document toDocument(ByteBuf buffer, Exchange exchange) {
         InputStream is = toInputStream(buffer, exchange);
         return exchange.getContext().getTypeConverter().convertTo(Document.class, exchange,
is);
     }
 
     @Converter
-    public static DOMSource toDOMSource(ChannelBuffer buffer, Exchange exchange) {
+    public static DOMSource toDOMSource(ByteBuf buffer, Exchange exchange) {
         InputStream is = toInputStream(buffer, exchange);
         return exchange.getContext().getTypeConverter().convertTo(DOMSource.class, exchange,
is);
     }
 
     @Converter
-    public static SAXSource toSAXSource(ChannelBuffer buffer, Exchange exchange) {
+    public static SAXSource toSAXSource(ByteBuf buffer, Exchange exchange) {
         InputStream is = toInputStream(buffer, exchange);
         return exchange.getContext().getTypeConverter().convertTo(SAXSource.class, exchange,
is);
     }
 
     @Converter
-    public static StreamSource toStreamSource(ChannelBuffer buffer, Exchange exchange) {
+    public static StreamSource toStreamSource(ByteBuf buffer, Exchange exchange) {
         InputStream is = toInputStream(buffer, exchange);
         return exchange.getContext().getTypeConverter().convertTo(StreamSource.class, exchange,
is);
     }
 
     @Converter
-    public static StAXSource toStAXSource(ChannelBuffer buffer, Exchange exchange) {
+    public static StAXSource toStAXSource(ByteBuf buffer, Exchange exchange) {
         InputStream is = toInputStream(buffer, exchange);
         return exchange.getContext().getTypeConverter().convertTo(StAXSource.class, exchange,
is);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
index ebc0ce8..3487ef7 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
@@ -18,10 +18,15 @@ package org.apache.camel.component.netty4;
 
 import java.math.BigInteger;
 import java.security.Principal;
+
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 import javax.security.cert.X509Certificate;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.Timer;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -32,15 +37,12 @@ import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.util.ObjectHelper;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.MessageEvent;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.Timer;
 
 @UriEndpoint(scheme = "netty", consumerClass = NettyConsumer.class)
 public class NettyEndpoint extends DefaultEndpoint {
     @UriParam
     private NettyConfiguration configuration;
+    // TODO do we really need this time in netty4
     private Timer timer;
 
     public NettyEndpoint(String endpointUri, NettyComponent component, NettyConfiguration
configuration) {
@@ -63,10 +65,10 @@ public class NettyEndpoint extends DefaultEndpoint {
         }
     }
 
-    public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent)
throws Exception {
+    public Exchange createExchange(ChannelHandlerContext ctx, Object message) throws Exception
{
         Exchange exchange = createExchange();
-        updateMessageHeader(exchange.getIn(), ctx, messageEvent);
-        NettyPayloadHelper.setIn(exchange, messageEvent.getMessage());
+        updateMessageHeader(exchange.getIn(), ctx);
+        NettyPayloadHelper.setIn(exchange, message);
         return exchange;
     }
     
@@ -115,11 +117,10 @@ public class NettyEndpoint extends DefaultEndpoint {
         return sslSession;
     }
 
-    protected void updateMessageHeader(Message in, ChannelHandlerContext ctx, MessageEvent
messageEvent) {
+    protected void updateMessageHeader(Message in, ChannelHandlerContext ctx) {
         in.setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
-        in.setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
-        in.setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress());
-        in.setHeader(NettyConstants.NETTY_LOCAL_ADDRESS, messageEvent.getChannel().getLocalAddress());
+        in.setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, ctx.channel().remoteAddress());
+        in.setHeader(NettyConstants.NETTY_LOCAL_ADDRESS, ctx.channel().localAddress());
 
         if (configuration.isSsl()) {
             // setup the SslSession header

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
index 5f36ff6..83520f5 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
@@ -18,11 +18,12 @@ package org.apache.camel.component.netty4;
 
 import java.net.SocketAddress;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.NoTypeConversionAvailableException;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +91,9 @@ public final class NettyHelper {
             if (log.isDebugEnabled()) {
                 log.debug("Channel: {} remote address: {} writing body: {}", new Object[]{channel,
remoteAddress, body});
             }
-            future = channel.write(body, remoteAddress);
+            //TODO need to check if we don't need to set the remoteAddress for the UDP channel
+            //future = channel.write(body, remoteAddress);
+            future = channel.write(body);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Channel: {} writing body: {}", new Object[]{channel, body});

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 58b3228..2a02031 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -253,7 +253,7 @@ public class NettyProducer extends DefaultAsyncProducer {
                 LOG.trace("Operation complete {}", channelFuture);
                 if (!channelFuture.isSuccess()) {
                     // no success the set the caused exception and signal callback and break
-                    exchange.setException(channelFuture.getCause());
+                    exchange.setException(channelFuture.cause());
                     producerCallback.done(false);
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
index 3b782db..901c298 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.netty4;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPipelineFactory;
+
 
 /**
- * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}.
+ * Factory to create {@link ChannelPipeline} for servers, eg {@link NettyConsumer}.
  * <p/>
  * Implementators must support creating a new instance of this factory which is associated
  * to the given {@link NettyConsumer} using the {@link #createPipelineFactory(NettyConsumer)}
@@ -28,7 +30,7 @@ import io.netty.channel.ChannelPipelineFactory;
  *
  * @see ChannelPipelineFactory
  */
-public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
+public abstract class ServerPipelineFactory extends ChannelInitializer<Channel> {
 
     /**
      * Creates a new {@link ClientPipelineFactory} using the given {@link NettyConsumer}

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
index f6e8d03..b4b7daf 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4;
 
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 
 /**
  * A {@link ChannelHandlerFactory} returning a shareable {@link ChannelHandler}.
@@ -33,4 +34,22 @@ public class ShareableChannelHandlerFactory implements ChannelHandlerFactory
{
     public ChannelHandler newChannelHandler() {
         return channelHandler;
     }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
+        // TODO Auto-generated method stub
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index e681d5b..c975afd 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.netty4.handlers;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
@@ -26,19 +28,13 @@ import org.apache.camel.component.netty4.NettyHelper;
 import org.apache.camel.component.netty4.NettyPayloadHelper;
 import org.apache.camel.component.netty4.NettyProducer;
 import org.apache.camel.util.ExchangeHelper;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelStateEvent;
-import io.netty.channel.ExceptionEvent;
-import io.netty.channel.MessageEvent;
-import io.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Client handler which cannot be shared
  */
-public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
+public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     // use NettyProducer as logger to make it easier to read the logs as this is part of
the producer
     private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
     private final NettyProducer producer;
@@ -50,18 +46,18 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent)
throws Exception {
+    public void channelActive(ChannelHandlerContext ctx) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Channel open: {}", ctx.getChannel());
+            LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
-        producer.getAllChannels().add(channelStateEvent.getChannel());
+        producer.getAllChannels().add(ctx.channel());
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Exception caught at Channel: " + ctx.getChannel(), exceptionEvent.getCause());
+            LOG.trace("Exception caught at Channel: " + ctx.channel(), cause);
         }
 
         if (exceptionHandled) {
@@ -70,7 +66,6 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
         }
 
         exceptionHandled = true;
-        Throwable cause = exceptionEvent.getCause();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel as an exception was thrown from Netty", cause);
@@ -85,7 +80,7 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
             exchange.setException(cause);
 
             // close channel in case an exception was thrown
-            NettyHelper.close(exceptionEvent.getChannel());
+            NettyHelper.close(ctx.channel());
 
             // signal callback
             callback.done(false);
@@ -93,19 +88,19 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
+    public void channelInactive(ChannelHandlerContext ctx) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Channel closed: {}", ctx.getChannel());
+            LOG.trace("Channel closed: {}", ctx.channel());
         }
 
         Exchange exchange = getExchange(ctx);
         AsyncCallback callback = getAsyncCallback(ctx);
 
         // remove state
-        producer.removeState(ctx.getChannel());
+        producer.removeState(ctx.channel());
 
         // to keep track of open sockets
-        producer.getAllChannels().remove(ctx.getChannel());
+        producer.getAllChannels().remove(ctx.channel());
 
         if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled)
{
             // To avoid call the callback.done twice
@@ -122,7 +117,8 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws
Exception {
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        // TODO Auto-generated method stub
         messageReceived = true;
 
         if (LOG.isTraceEnabled()) {
@@ -146,7 +142,7 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
 
         Message message;
         try {
-            message = getResponseMessage(exchange, messageEvent);
+            message = getResponseMessage(exchange, ctx, msg);
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(false);
@@ -183,7 +179,7 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Closing channel when complete at address: {}", producer.getConfiguration().getAddress());
                 }
-                NettyHelper.close(ctx.getChannel());
+                NettyHelper.close(ctx.channel());
             }
         } finally {
             // signal callback
@@ -197,19 +193,22 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
      * <p/>
      *
      * @param exchange      the current exchange
-     * @param messageEvent  the incoming event which has the response message from Netty.
+     * @param ctx       the channel handler context
+     * @param message  the incoming event which has the response message from Netty.
      * @return the Camel {@link Message} to set on the current {@link Exchange} as the response
message.
      * @throws Exception is thrown if error getting the response message
      */
-    protected Message getResponseMessage(Exchange exchange, MessageEvent messageEvent) throws
Exception {
-        Object body = messageEvent.getMessage();
+    protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object
message) throws Exception {
+
+        Object body = message;
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Channel: {} received body: {}", new Object[]{messageEvent.getChannel(),
body});
+            LOG.debug("Channel: {} received body: {}", new Object[]{ctx.channel(), body});
         }
 
         // if textline enabled then covert to a String which must be used for textline
         if (producer.getConfiguration().isTextline()) {
-            body = producer.getContext().getTypeConverter().mandatoryConvertTo(String.class,
exchange, body);
+            body = producer.getContext().getTypeConverter().mandatoryConvertTo(String.class,
exchange, message);
         }
 
         // set the result on either IN or OUT on the original exchange depending on its pattern
@@ -223,13 +222,15 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler
{
     }
 
     private Exchange getExchange(ChannelHandlerContext ctx) {
-        NettyCamelState state = producer.getState(ctx.getChannel());
+        NettyCamelState state = producer.getState(ctx.channel());
         return state != null ? state.getExchange() : null;
     }
 
     private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
-        NettyCamelState state = producer.getState(ctx.getChannel());
+        NettyCamelState state = producer.getState(ctx.channel());
         return state != null ? state.getCallback() : null;
     }
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index a20c3ab..d800b5d 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -18,6 +18,10 @@ package org.apache.camel.component.netty4.handlers;
 
 import java.net.SocketAddress;
 
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -26,68 +30,61 @@ import org.apache.camel.component.netty4.NettyHelper;
 import org.apache.camel.component.netty4.NettyPayloadHelper;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.IOHelper;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelStateEvent;
-import io.netty.channel.ExceptionEvent;
-import io.netty.channel.MessageEvent;
-import io.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Client handler which cannot be shared
  */
-public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
+public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
     // use NettyConsumer as logger to make it easier to read the logs as this is part of
the consumer
     private static final Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
     private final NettyConsumer consumer;
     private final CamelLogger noReplyLogger;
 
     public ServerChannelHandler(NettyConsumer consumer) {
-        this.consumer = consumer;    
+        this.consumer = consumer;
         this.noReplyLogger = new CamelLogger(LOG, consumer.getConfiguration().getNoReplyLogLevel());
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
+    public void channelActive(ChannelHandlerContext ctx) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Channel open: {}", e.getChannel());
+            LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
-        consumer.getNettyServerBootstrapFactory().addChannel(e.getChannel());
+        consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
+    public void channelInactive(ChannelHandlerContext ctx) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Channel closed: {}", e.getChannel());
+            LOG.trace("Channel closed: {}", ctx.channel());
         }
         // to keep track of open sockets
-        consumer.getNettyServerBootstrapFactory().removeChannel(e.getChannel());
+        consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
         // only close if we are still allowed to run
         if (consumer.isRunAllowed()) {
             // let the exception handler deal with it
-            consumer.getExceptionHandler().handleException("Closing channel as an exception
was thrown from Netty", exceptionEvent.getCause());
+            consumer.getExceptionHandler().handleException("Closing channel as an exception
was thrown from Netty", cause);
             // close channel in case an exception was thrown
-            NettyHelper.close(exceptionEvent.getChannel());
+            NettyHelper.close(ctx.channel());
         }
     }
-    
+
     @Override
-    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent)
throws Exception {
-        Object in = messageEvent.getMessage();
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        Object in = msg;
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Channel: {} received body: {}", new Object[]{messageEvent.getChannel(),
in});
+            LOG.debug("Channel: {} received body: {}", new Object[]{ctx.channel(), in});
         }
 
         // create Exchange and let the consumer process it
-        final Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
-
+        final Exchange exchange = consumer.getEndpoint().createExchange(ctx, msg);
         if (consumer.getConfiguration().isSync()) {
             exchange.setPattern(ExchangePattern.InOut);
         }
@@ -103,9 +100,9 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler
{
 
         // process accordingly to endpoint configuration
         if (consumer.getEndpoint().isSynchronous()) {
-            processSynchronously(exchange, messageEvent);
+            processSynchronously(exchange, ctx, msg);
         } else {
-            processAsynchronously(exchange, messageEvent);
+            processAsynchronously(exchange, ctx, msg);
         }
     }
 
@@ -113,17 +110,18 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler
{
      * Allows any custom logic before the {@link Exchange} is processed by the routing engine.
      *
      * @param exchange       the exchange
-     * @param messageEvent   the Netty message event
+     * @param ctx            the channel handler context
+     * @param message        the message which needs to be sent
      */
-    protected void beforeProcess(final Exchange exchange, final MessageEvent messageEvent)
{
+    protected void beforeProcess(final Exchange exchange, final ChannelHandlerContext ctx,
final Object message) {
         // noop
     }
 
-    private void processSynchronously(final Exchange exchange, final MessageEvent messageEvent)
{
+    private void processSynchronously(final Exchange exchange, final ChannelHandlerContext
ctx, final Object message) {
         try {
             consumer.getProcessor().process(exchange);
             if (consumer.getConfiguration().isSync()) {
-                sendResponse(messageEvent, exchange);
+                sendResponse(message, ctx, exchange);
             }
         } catch (Throwable e) {
             consumer.getExceptionHandler().handleException(e);
@@ -132,14 +130,14 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler
{
         }
     }
 
-    private void processAsynchronously(final Exchange exchange, final MessageEvent messageEvent)
{
+    private void processAsynchronously(final Exchange exchange, final ChannelHandlerContext
ctx, final Object message) {
         consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
             @Override
             public void done(boolean doneSync) {
                 // send back response if the communication is synchronous
                 try {
                     if (consumer.getConfiguration().isSync()) {
-                        sendResponse(messageEvent, exchange);
+                        sendResponse(message, ctx, exchange);
                     }
                 } catch (Throwable e) {
                     consumer.getExceptionHandler().handleException(e);
@@ -150,7 +148,7 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler
{
         });
     }
 
-    private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception
{
+    private void sendResponse(Object message, ChannelHandlerContext ctx, Exchange exchange)
throws Exception {
         Object body = getResponseBody(exchange);
 
         if (body == null) {
@@ -159,9 +157,9 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler
{
                 // must close session if no data to write otherwise client will never receive
a response
                 // and wait forever (if not timing out)
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Closing channel as no payload to send as reply at address:
{}", messageEvent.getRemoteAddress());
+                    LOG.trace("Closing channel as no payload to send as reply at address:
{}", ctx.channel().remoteAddress());
                 }
-                NettyHelper.close(messageEvent.getChannel());
+                NettyHelper.close(ctx.channel());
             }
         } else {
             // if textline enabled then covert to a String which must be used for textline
@@ -170,11 +168,11 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler
{
             }
 
             // we got a body to write
-            ChannelFutureListener listener = createResponseFutureListener(consumer, exchange,
messageEvent.getRemoteAddress());
+            ChannelFutureListener listener = createResponseFutureListener(consumer, exchange,
ctx.channel().remoteAddress());
             if (consumer.getConfiguration().isTcp()) {
-                NettyHelper.writeBodyAsync(LOG, messageEvent.getChannel(), null, body, exchange,
listener);
+                NettyHelper.writeBodyAsync(LOG, ctx.channel(), null, body, exchange, listener);
             } else {
-                NettyHelper.writeBodyAsync(LOG, messageEvent.getChannel(), messageEvent.getRemoteAddress(),
body, exchange, listener);
+                NettyHelper.writeBodyAsync(LOG, ctx.channel(), ctx.channel().remoteAddress(),
body, exchange, listener);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/08630d21/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java
index b66b8db..759b69d 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerResponseFutureListener.java
@@ -18,13 +18,14 @@ package org.apache.camel.component.netty4.handlers;
 
 import java.net.SocketAddress;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyConsumer;
 import org.apache.camel.component.netty4.NettyHelper;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,7 @@ public class ServerResponseFutureListener implements ChannelFutureListener
{
     public void operationComplete(ChannelFuture future) throws Exception {
         // if it was not a success then thrown an exception
         if (!future.isSuccess()) {
-            Exception e = new CamelExchangeException("Cannot write response to " + remoteAddress,
exchange, future.getCause());
+            Exception e = new CamelExchangeException("Cannot write response to " + remoteAddress,
exchange, future.cause());
             consumer.getExceptionHandler().handleException(e);
         }
 
@@ -71,7 +72,7 @@ public class ServerResponseFutureListener implements ChannelFutureListener
{
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Closing channel when complete at address: {}", remoteAddress);
             }
-            NettyHelper.close(future.getChannel());
+            NettyHelper.close(future.channel());
         }
     }
 }


Mime
View raw message