camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1310991 - in /camel/branches/camel-2.9.x: ./ components/camel-netty/src/main/java/org/apache/camel/component/netty/ components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ components/camel-netty/src/test/java/org/ap...
Date Sun, 08 Apr 2012 12:52:18 GMT
Author: davsclaus
Date: Sun Apr  8 12:52:17 2012
New Revision: 1310991

URL: http://svn.apache.org/viewvc?rev=1310991&view=rev
Log:
CAMEL-5150: Some cleanup in camel-netty according to Netty docs.

Added:
    camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
      - copied unchanged from r1310979, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1310979

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
(original)
+++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
Sun Apr  8 12:52:17 2012
@@ -68,8 +68,6 @@ public class NettyConfiguration implemen
     private long sendBufferSize = 65536;
     private long receiveBufferSize = 65536;
     private int receiveBufferSizePredictor;
-    private int corePoolSize = 10;
-    private int maxPoolSize = 100;
     private int workerCount;
     private String keyStoreFormat;
     private String securityProvider;
@@ -385,22 +383,6 @@ public class NettyConfiguration implemen
         this.trustStoreFile = trustStoreFile;
     }
 
-    public int getCorePoolSize() {
-        return corePoolSize;
-    }
-
-    public void setCorePoolSize(int corePoolSize) {
-        this.corePoolSize = corePoolSize;
-    }
-
-    public int getMaxPoolSize() {
-        return maxPoolSize;
-    }
-
-    public void setMaxPoolSize(int maxPoolSize) {
-        this.maxPoolSize = maxPoolSize;
-    }
-
     public String getKeyStoreFormat() {
         return keyStoreFormat;
     }

Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
(original)
+++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
Sun Apr  8 12:52:17 2012
@@ -46,12 +46,14 @@ public class NettyConsumer extends Defau
     private ServerBootstrap serverBootstrap;
     private ConnectionlessBootstrap connectionlessServerBootstrap;
     private Channel channel;
+    private ExecutorService bossExecutor;
+    private ExecutorService workerExecutor;
 
     public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration
configuration) {
         super(nettyEndpoint, processor);
         this.context = this.getEndpoint().getCamelContext();
         this.configuration = configuration;
-        this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
+        this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri());
     }
 
     @Override
@@ -78,14 +80,23 @@ public class NettyConsumer extends Defau
         LOG.debug("Netty consumer unbinding from: {}", configuration.getAddress());
 
         // close all channels
+        LOG.trace("Closing {} channels", allChannels.size());
         ChannelGroupFuture future = allChannels.close();
         future.awaitUninterruptibly();
 
-        // and then release other resources
+        // close server external resources
         if (channelFactory != null) {
             channelFactory.releaseExternalResources();
         }
 
+        // and then shutdown the thread pools
+        if (bossExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(bossExecutor);
+        }
+        if (workerExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(workerExecutor);
+        }
+
         super.doStop();
 
         LOG.info("Netty consumer unbound from: " + configuration.getAddress());
@@ -144,12 +155,10 @@ public class NettyConsumer extends Defau
     }
 
     private void initializeTCPServerSocketCommunicationLayer() throws Exception {
-        ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this,
"NettyTCPBoss",
-                configuration.getCorePoolSize(), configuration.getMaxPoolSize());
-        ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this,
"NettyTCPWorker",
-                configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+        bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+        workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
 
-        if (configuration.getWorkerCount() == 0) {
+        if (configuration.getWorkerCount() <= 0) {
             channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
         } else {
             channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor,
@@ -164,6 +173,7 @@ public class NettyConsumer extends Defau
         }
         serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
         serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
         serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
 
@@ -173,10 +183,12 @@ public class NettyConsumer extends Defau
     }
 
     private void initializeUDPServerSocketCommunicationLayer() throws Exception {
-        ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this,
"NettyUDPWorker",
-                configuration.getCorePoolSize(), configuration.getMaxPoolSize());
-
-        datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+        workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
+        if (configuration.getWorkerCount() <= 0) {
+            datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+        } else {
+            datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
+        }
         connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
         if (configuration.getServerPipelineFactory() != null) {
             configuration.getServerPipelineFactory().setConsumer(this);
@@ -186,6 +198,7 @@ public class NettyConsumer extends Defau
         }
         connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        connectionlessServerBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
         connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
         connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
         connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());

Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Sun Apr  8 12:52:17 2012
@@ -55,6 +55,8 @@ public class NettyProducer extends Defau
     private ChannelFactory channelFactory;
     private DatagramChannelFactory datagramChannelFactory;
     private CamelLogger noReplyLogger;
+    private ExecutorService bossExecutor;
+    private ExecutorService workerExecutor;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
         super(nettyEndpoint);
@@ -103,6 +105,7 @@ public class NettyProducer extends Defau
     protected void doStop() throws Exception {
         LOG.debug("Stopping producer at address: {}", configuration.getAddress());
         // close all channels
+        LOG.trace("Closing {} channels", ALL_CHANNELS.size());
         ChannelGroupFuture future = ALL_CHANNELS.close();
         future.awaitUninterruptibly();
 
@@ -110,6 +113,15 @@ public class NettyProducer extends Defau
         if (channelFactory != null) {
             channelFactory.releaseExternalResources();
         }
+
+        // and then shutdown the thread pools
+        if (bossExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(bossExecutor);
+        }
+        if (workerExecutor != null) {
+            context.getExecutorServiceManager().shutdownNow(workerExecutor);
+        }
+
         super.doStop();
     }
 
@@ -208,18 +220,15 @@ public class NettyProducer extends Defau
 
     protected void setupTCPCommunication() throws Exception {
         if (channelFactory == null) {
-            ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this,
"NettyTCPBoss",
-                    configuration.getCorePoolSize(), configuration.getMaxPoolSize());
-            ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this,
"NettyTCPWorker",
-                    configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+            bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this,
"NettyTCPBoss");
+            workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this,
"NettyTCPWorker");
             channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
         }
     }
 
     protected void setupUDPCommunication() throws Exception {
         if (datagramChannelFactory == null) {
-            ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this,
"NettyUDPWorker",
-                    configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+            workerExecutor = context.getExecutorServiceManager().newCachedThreadPool(this,
"NettyUDPWorker");
             datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
         }
     }
@@ -243,16 +252,17 @@ public class NettyProducer extends Defau
 
         if (isTcp()) {
             ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
-            clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
-            clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
-            clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
-            clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+            clientBootstrap.setOption("keepAlive", configuration.isKeepAlive());
+            clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay());
+            clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+            clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout());
 
             // set the pipeline on the bootstrap
             clientBootstrap.setPipeline(clientPipeline);
             answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(),
configuration.getPort()));
             return answer;
         } else {
+            // TODO: Is this correct for a UDP client
             ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
             connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
             connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());

Modified: camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
(original)
+++ camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Sun Apr  8 12:52:17 2012
@@ -42,7 +42,7 @@ public class ClientChannelHandler extend
     private final Exchange exchange;
     private final AsyncCallback callback;
     private boolean messageReceived;
-    private boolean exceptionHandled;
+    private volatile boolean exceptionHandled;
 
     public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback
callback) {
         this.producer = producer;

Modified: camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
(original)
+++ camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
Sun Apr  8 12:52:17 2012
@@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file
 # uncomment the following to enable camel debugging
 #log4j.logger.org.apache.camel.component.netty=TRACE
 #log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.org.apache.commons.net=TRACE
+#log4j.logger.org.jboss.netty=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender



Mime
View raw message