camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1424386 - in /camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty: NettyProducer.java handlers/ClientChannelHandler.java
Date Thu, 20 Dec 2012 10:03:20 GMT
Author: davsclaus
Date: Thu Dec 20 10:03:20 2012
New Revision: 1424386

URL: http://svn.apache.org/viewvc?rev=1424386&view=rev
Log:
CAMEL-5899: netty producer should honor connection timeout while waiting for netty to open
connection. And keep better track of open/closed channels.

Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424386&r1=1424385&r2=1424386&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Thu Dec 20 10:03:20 2012
@@ -18,10 +18,8 @@ package org.apache.camel.component.netty
 
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -305,7 +303,7 @@ public class NettyProducer extends Defau
         }
     }
 
-    private ChannelFuture openConnection() throws Exception {
+    protected ChannelFuture openConnection() throws Exception {
         ChannelFuture answer;
 
         if (isTcp()) {
@@ -365,20 +363,13 @@ public class NettyProducer extends Defau
     }
 
     private Channel openChannel(ChannelFuture channelFuture) throws Exception {
-        // wait until until the operation is complete
-        final CountDownLatch latch = new CountDownLatch(1);
-        channelFuture.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture channelFuture) throws Exception {
-                LOG.trace("Operation complete {}", channelFuture);
-                latch.countDown();
-            }
-        });
         // blocking for channel to be done
-        LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
-        latch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture,
configuration.getConnectTimeout());
+        }
+        channelFuture.awaitUninterruptibly(configuration.getConnectTimeout());
 
-        if (!channelFuture.isSuccess()) {
+        if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
             throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
         }
         Channel answer = channelFuture.getChannel();

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1424386&r1=1424385&r2=1424386&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Thu Dec 20 10:03:20 2012
@@ -50,6 +50,9 @@ public class ClientChannelHandler extend
 
     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent)
throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Channel open: {}", ctx.getChannel());
+        }
         // to keep track of open sockets
         producer.getAllChannels().add(channelStateEvent.getChannel());
     }
@@ -90,7 +93,9 @@ public class ClientChannelHandler extend
 
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
-        LOG.trace("Channel closed: {}", ctx.getChannel());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Channel closed: {}", ctx.getChannel());
+        }
 
         Exchange exchange = getExchange(ctx);
         AsyncCallback callback = getAsyncCallback(ctx);
@@ -98,6 +103,9 @@ public class ClientChannelHandler extend
         // remove state
         producer.removeState(ctx.getChannel());
 
+        // to keep track of open sockets
+        producer.getAllChannels().remove(ctx.getChannel());
+
         if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled)
{
             // session was closed but no message received. This could be because the remote
server had an internal error
             // and could not return a response. We should count down to stop waiting for
a response



Mime
View raw message