camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/3] git commit: CAMEL-5819: Added requestTimeout option to netty producer. As well options to control logging level on netty consumer, so its less noisy by default about channel closed, when clients disconnect abruptly.
Date Tue, 04 Jun 2013 14:47:50 GMT
Updated Branches:
  refs/heads/camel-2.11.x bcb0f0048 -> 048601dc5
  refs/heads/master 82d0e332a -> 7ef115ec1


CAMEL-5819: Added requestTimeout option to netty producer. As well options to control logging
level on netty consumer, so its less noisy by default about channel closed, when clients disconnect
abruptly.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c5d470e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c5d470e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c5d470e

Branch: refs/heads/master
Commit: 7c5d470ed753e7d7a7bab1a00fa13c9f9d9ac75d
Parents: 82d0e33
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Jun 4 16:46:00 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Jun 4 16:46:00 2013 +0200

----------------------------------------------------------------------
 .../netty/DefaultClientPipelineFactory.java        |   11 ++
 .../camel/component/netty/NettyConfiguration.java  |   27 +++++
 .../camel/component/netty/NettyConsumer.java       |    1 +
 .../netty/NettyConsumerExceptionHandler.java       |   66 +++++++++++
 .../netty/handlers/ServerChannelHandler.java       |    9 +-
 .../component/netty/NettyRequestTimeoutTest.java   |   84 +++++++++++++++
 6 files changed, 193 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7c5d470e/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
index cc7cc05..9503fac 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
@@ -27,6 +28,7 @@ import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +77,15 @@ public class DefaultClientPipelineFactory extends ClientPipelineFactory
 {
             addToPipeline("encoder-" + x, channelPipeline, encoder);
         }
 
+        // do we use request timeout?
+        if (producer.getConfiguration().getRequestTimeout() > 0) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
+            }
+            ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(),
TimeUnit.MILLISECONDS);
+            addToPipeline("timeout", channelPipeline, timeout);
+        }
+
         // our handler must be added last
         addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));
 

http://git-wip-us.apache.org/repos/asf/camel/blob/7c5d470e/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index 33a092a..01fd17e 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -47,6 +47,7 @@ public class NettyConfiguration implements Cloneable {
     private boolean tcpNoDelay = true;
     private boolean broadcast;
     private long connectTimeout = 10000;
+    private long requestTimeout;
     private boolean reuseAddress = true;
     private boolean sync = true;
     private boolean textline;
@@ -74,6 +75,8 @@ public class NettyConfiguration implements Cloneable {
     private boolean transferExchange;
     private boolean disconnectOnNoReply = true;
     private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
+    private LoggingLevel serverExceptionCaughtLogLevel = LoggingLevel.WARN;
+    private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG;
     private boolean allowDefaultCodec = true;
     private ClientPipelineFactory clientPipelineFactory;
     private ServerPipelineFactory serverPipelineFactory;
@@ -283,6 +286,14 @@ public class NettyConfiguration implements Cloneable {
         this.connectTimeout = connectTimeout;
     }
 
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
     public boolean isReuseAddress() {
         return reuseAddress;
     }
@@ -523,6 +534,22 @@ public class NettyConfiguration implements Cloneable {
         this.noReplyLogLevel = noReplyLogLevel;
     }
 
+    public LoggingLevel getServerExceptionCaughtLogLevel() {
+        return serverExceptionCaughtLogLevel;
+    }
+
+    public void setServerExceptionCaughtLogLevel(LoggingLevel serverExceptionCaughtLogLevel)
{
+        this.serverExceptionCaughtLogLevel = serverExceptionCaughtLogLevel;
+    }
+
+    public LoggingLevel getServerClosedChannelExceptionCaughtLogLevel() {
+        return serverClosedChannelExceptionCaughtLogLevel;
+    }
+
+    public void setServerClosedChannelExceptionCaughtLogLevel(LoggingLevel serverClosedChannelExceptionCaughtLogLevel)
{
+        this.serverClosedChannelExceptionCaughtLogLevel = serverClosedChannelExceptionCaughtLogLevel;
+    }
+
     public boolean isAllowDefaultCodec() {
         return allowDefaultCodec;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/7c5d470e/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
index 595e587..46dbb5b 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
@@ -56,6 +56,7 @@ public class NettyConsumer extends DefaultConsumer {
         this.context = this.getEndpoint().getCamelContext();
         this.configuration = configuration;
         this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri());
+        setExceptionHandler(new NettyConsumerExceptionHandler(this));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/7c5d470e/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
new file mode 100644
index 0000000..845b189
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumerExceptionHandler implements ExceptionHandler {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+    private final CamelLogger logger;
+    private final LoggingLevel closedLoggingLevel;
+
+    public NettyConsumerExceptionHandler(NettyConsumer consumer) {
+        this.logger = new CamelLogger(LOG, consumer.getConfiguration().getServerExceptionCaughtLogLevel());
+        this.closedLoggingLevel = consumer.getConfiguration().getServerClosedChannelExceptionCaughtLogLevel();
+    }
+
+    @Override
+    public void handleException(Throwable exception) {
+        handleException(null, null, exception);
+    }
+
+    @Override
+    public void handleException(String message, Throwable exception) {
+        handleException(message, null, exception);
+    }
+
+    @Override
+    public void handleException(String message, Exchange exchange, Throwable exception) {
+        try {
+            String msg = CamelExchangeException.createExceptionMessage(message, exchange,
exception);
+            boolean closed = ObjectHelper.getException(ClosedChannelException.class, exception)
!= null;
+            if (closed) {
+                logger.log(msg, exception, closedLoggingLevel);
+            } else {
+                logger.log(msg, exception);
+            }
+        } catch (Throwable e) {
+            // the logging exception handler must not cause new exceptions to occur
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7c5d470e/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index a509692..211e37f 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -25,7 +25,6 @@ import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.NettyPayloadHelper;
 import org.apache.camel.util.CamelLogger;
-import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -42,8 +41,8 @@ import org.slf4j.LoggerFactory;
 public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     // use NettyConsumer as logger to make it easier to read the logs as this is part of
the consumer
     private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
-    private NettyConsumer consumer;
-    private CamelLogger noReplyLogger;
+    private final NettyConsumer consumer;
+    private final CamelLogger noReplyLogger;
 
     public ServerChannelHandler(NettyConsumer consumer) {
         this.consumer = consumer;    
@@ -72,8 +71,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
throws Exception {
         // only close if we are still allowed to run
         if (consumer.isRunAllowed()) {
-            LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
-
+            // let the exception handler deal with it
+            consumer.getExceptionHandler().handleException("Closing channel as an exception
was thrown from Netty", exceptionEvent.getCause());
             // close channel in case an exception was thrown
             NettyHelper.close(exceptionEvent.getChannel());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/7c5d470e/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
new file mode 100644
index 0000000..94f9e79
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class NettyRequestTimeoutTest extends BaseNettyTest {
+
+    @Test
+    public void testRequestTimeoutOK() throws Exception {
+        String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=5000",
"Hello Camel", String.class);
+        assertEquals("Bye World", out);
+    }
+
+    @Test
+    public void testRequestTimeout() throws Exception {
+        try {
+            template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000",
"Hello Camel", String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+    }
+
+    @Test
+    public void testRequestTimeoutAndOk() throws Exception {
+        try {
+            template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000",
"Hello Camel", String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+
+        // now we try again but this time the is no delay on server and thus faster
+        String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000",
"Hello World", String.class);
+        assertEquals("Bye World", out);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            String body = exchange.getIn().getBody(String.class);
+
+                            if (body.contains("Camel")) {
+                                Thread.sleep(3000);
+                            }
+                        }
+                    })
+                    .transform().constant("Bye World");
+
+            }
+        };
+    }
+}


Mime
View raw message