camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1398047 - in /camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty: DefaultServerPipelineFactory.java NettyComponent.java NettyEndpoint.java
Date Sun, 14 Oct 2012 10:26:57 GMT
Author: davsclaus
Date: Sun Oct 14 10:26:56 2012
New Revision: 1398047

URL: http://svn.apache.org/viewvc?rev=1398047&view=rev
Log:
CAMEL-5702: The ordered thread pool should be shared.

Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1398047&r1=1398046&r2=1398047&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
Sun Oct 14 10:26:56 2012
@@ -17,19 +17,15 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
-import org.apache.camel.util.concurrent.CamelThreadFactory;
 import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,15 +70,11 @@ public class DefaultServerPipelineFactor
         }
 
         if (consumer.getConfiguration().isOrderedThreadPoolExecutor()) {
+            // this must be added just before the ServerChannelHandler
             // use ordered thread pool, to ensure we process the events in order, and can
send back
             // replies in the expected order. eg this is required by TCP.
             // and use a Camel thread factory so we have consistent thread namings
-            String pattern = consumer.getContext().getExecutorServiceManager().getThreadNamePattern();
-            ThreadFactory factory = new CamelThreadFactory(pattern, "NettyOrderedWorker",
true);
-            final ExecutionHandler executionHandler = new ExecutionHandler(
-                    new OrderedMemoryAwareThreadPoolExecutor(consumer.getConfiguration().getMaximumPoolSize(),
-                            0L, 0L, 30, TimeUnit.SECONDS, factory));
-            // this must be added just before the ServerChannelHandler
+            ExecutionHandler executionHandler = new ExecutionHandler(consumer.getEndpoint().getComponent().getExecutorService());
             addToPipeline("executionHandler", channelPipeline, executionHandler);
             LOG.debug("Using OrderedMemoryAwareThreadPoolExecutor with core pool size: {}",
consumer.getConfiguration().getMaximumPoolSize());
         }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=1398047&r1=1398046&r2=1398047&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
Sun Oct 14 10:26:56 2012
@@ -18,10 +18,14 @@ package org.apache.camel.component.netty
 
 import java.net.URI;
 import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.concurrent.CamelThreadFactory;
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timer;
 
@@ -29,6 +33,7 @@ public class NettyComponent extends Defa
     // use a shared timer for Netty (see javadoc for HashedWheelTimer)
     private static volatile Timer timer;
     private NettyConfiguration configuration;
+    private OrderedMemoryAwareThreadPoolExecutor executorService;
 
     public NettyComponent() {
     }
@@ -69,18 +74,50 @@ public class NettyComponent extends Defa
         return timer;
     }
 
+    public synchronized OrderedMemoryAwareThreadPoolExecutor getExecutorService() {
+        if (executorService == null) {
+            executorService = createExecutorService();
+        }
+        return executorService;
+    }
+
     @Override
     protected void doStart() throws Exception {
         if (timer == null) {
             timer = new HashedWheelTimer();
         }
+
+        if (configuration == null) {
+            configuration = new NettyConfiguration();
+        }
+        if (configuration.isOrderedThreadPoolExecutor()) {
+            executorService = createExecutorService();
+        }
+
         super.doStart();
     }
 
+    protected OrderedMemoryAwareThreadPoolExecutor createExecutorService() {
+        // use ordered thread pool, to ensure we process the events in order, and can send
back
+        // replies in the expected order. eg this is required by TCP.
+        // and use a Camel thread factory so we have consistent thread namings
+        // we should use a shared thread pool as recommended by Netty
+        String pattern = getCamelContext().getExecutorServiceManager().getThreadNamePattern();
+        ThreadFactory factory = new CamelThreadFactory(pattern, "NettyOrderedWorker", true);
+        return new OrderedMemoryAwareThreadPoolExecutor(configuration.getMaximumPoolSize(),
+                0L, 0L, 30, TimeUnit.SECONDS, factory);
+    }
+
     @Override
     protected void doStop() throws Exception {
         timer.stop();
         timer = null;
+
+        if (executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+
         super.doStop();
     }
 

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=1398047&r1=1398046&r2=1398047&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
Sun Oct 14 10:26:56 2012
@@ -62,6 +62,11 @@ public class NettyEndpoint extends Defau
         return true;
     }
 
+    @Override
+    public NettyComponent getComponent() {
+        return (NettyComponent) super.getComponent();
+    }
+
     public NettyConfiguration getConfiguration() {
         return configuration;
     }



Mime
View raw message