camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1215469 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/main/java/org/apache/camel/util...
Date Sat, 17 Dec 2011 11:52:30 GMT
Author: davsclaus
Date: Sat Dec 17 11:52:29 2011
New Revision: 1215469

URL: http://svn.apache.org/viewvc?rev=1215469&view=rev
Log:
CAMEL-4786: Add sized scheduled thread pool to ensure scheduled thread pools do not eat up
memory as the JDK pools is unbounded. Fixed throttler and delayer to use thread pool profile
so end user can customize core pool size with these EIPs.

Added:
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
      - copied unchanged from r1215240, camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
      - copied, changed from r1215240, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
      - copied unchanged from r1215240, camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 11:52:29 2011
@@ -1 +1 @@
-/camel/trunk:1202148,1202167,1202204-1202206,1202214-1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210391,1210771,1210830,1211363,1211414,1211773,1211811,1212275-1212276,1212408,1213197,1213219,1213232,1213526,1214132,1214639,1214743,1215448
+/camel/trunk:1202148,1202167,1202204-1202206,1202214-1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210391,1210771,1210830,1211363,1211414,1211773,1211811,1212275-1212276,1212408,1213197,1213219,1213232,1213526,1214132,1214639,1214743,1215240,1215448

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
(original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
Sat Dec 17 11:52:29 2011
@@ -38,6 +38,7 @@ import org.apache.camel.spi.LifecycleStr
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,7 +173,11 @@ public class DefaultExecutorServiceStrat
                 if (poolSize == null) {
                     poolSize = getDefaultThreadPoolProfile().getPoolSize();
                 }
-                answer = newScheduledThreadPool(source, name, poolSize);
+                Integer maxQueueSize = profile.getMaxQueueSize();
+                if (maxQueueSize == null) {
+                    maxQueueSize = getDefaultThreadPoolProfile().getMaxQueueSize();
+                }
+                answer = newScheduledThreadPool(source, name, poolSize, maxQueueSize);
                 if (answer != null) {
                     LOG.debug("Looking up ScheduledExecutorService with ref: {} and found
a matching ThreadPoolProfile to create the ScheduledExecutorService: {}",
                             executorServiceRef, answer);
@@ -224,15 +229,20 @@ public class DefaultExecutorServiceStrat
 
     public ScheduledExecutorService newScheduledThreadPool(Object source, String name) {
         int poolSize = getDefaultThreadPoolProfile().getPoolSize();
-        return newScheduledThreadPool(source, name, poolSize);
+        int queueSize = getDefaultThreadPoolProfile().getMaxQueueSize();
+        return newScheduledThreadPool(source, name, poolSize, queueSize);
     }
 
     public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int
poolSize) {
-        ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize,
threadNamePattern, name, true);
+        return newScheduledThreadPool(source, name, poolSize, 0);
+    }
+
+    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int
poolSize, int maxQueueSize) {
+        ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize,
maxQueueSize, threadNamePattern, name, true);
         onThreadPoolCreated(answer, source, null);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Created new scheduled thread pool for source: {} with name: {}. [poolSize={}].
-> {}", new Object[]{source, name, poolSize, answer});
+            LOG.debug("Created new scheduled thread pool for source: {} with name: {}. [poolSize={},
maxQueueSize={}]. -> {}", new Object[]{source, name, poolSize, maxQueueSize, answer});
         }
         return answer;
     }
@@ -392,8 +402,13 @@ public class DefaultExecutorServiceStrat
         }
 
         // let lifecycle strategy be notified as well which can let it be managed in JMX
as well
+        ThreadPoolExecutor threadPool = null;
         if (executorService instanceof ThreadPoolExecutor) {
-            ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
+            threadPool = (ThreadPoolExecutor) executorService;
+        } else if (executorService instanceof SizedScheduledExecutorService) {
+            threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
+        }
+        if (threadPool != null) {
             for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
                 lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId,
threadPoolProfileId);
             }

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Sat Dec 17 11:52:29 2011
@@ -84,6 +84,7 @@ public abstract class DelayProcessorSupp
         long delay = calculateDelay(exchange);
         if (delay <= 0) {
             // no delay then continue routing
+            log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
             return super.process(exchange, callback);
         }
 
@@ -113,6 +114,7 @@ public abstract class DelayProcessorSupp
                     if (!isRunAllowed()) {
                         exchange.setException(new RejectedExecutionException());
                     } else {
+                        log.debug("Scheduling rejected task, so letting caller run, delaying
at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId());
                         // let caller run by processing
                         try {
                             delay(delay, exchange);

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
(original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
Sat Dec 17 11:52:29 2011
@@ -165,6 +165,17 @@ public interface ExecutorServiceStrategy
 
     /**
      * Creates a new scheduled thread pool.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @param poolSize    the core pool size
+     * @param maxQueueSize the max queue size, use 0 or negative for unbounded
+     * @return the created thread pool
+     */
+    ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize,
int maxQueueSize);
+
+    /**
+     * Creates a new scheduled thread pool.
      * <p/>
      * Will use the pool size from the default thread pool profile
      *

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
(original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Sat Dec 17 11:52:29 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -106,7 +107,31 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ScheduledExecutorService newScheduledThreadPool(final int poolSize, final
String pattern, final String name, final boolean daemon) {
-        return Executors.newScheduledThreadPool(poolSize, new CamelThreadFactory(pattern,
name, daemon));
+        return newScheduledThreadPool(poolSize, 0, pattern, name, daemon);
+    }
+
+    /**
+     * Creates a new scheduled thread pool which can schedule threads.
+     *
+     * @param poolSize the core pool size
+     * @param maxQueueSize max queue size, use 0 or negative for unbounded
+     * @param pattern  pattern of the thread name
+     * @param name     ${name} in the pattern name
+     * @param daemon   whether the threads is daemon or not
+     * @return the created pool
+     */
+    public static ScheduledExecutorService newScheduledThreadPool(final int poolSize, final
int maxQueueSize, final String pattern, final String name, final boolean daemon) {
+        ScheduledThreadPoolExecutor answer = new ScheduledThreadPoolExecutor(poolSize, new
CamelThreadFactory(pattern, name, daemon));
+        // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
+
+        // need to wrap the thread pool in a sized to guard against the problem that the
+        // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+        // we could potentially keep adding tasks, and run out of memory.
+        if (maxQueueSize > 0) {
+            return new SizedScheduledExecutorService(answer, maxQueueSize);
+        } else {
+            return answer;
+        }
     }
 
     /**

Copied: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
(from r1215240, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java)
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java?p2=camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java&r1=1215240&r2=1215469&rev=1215469&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
(original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
Sat Dec 17 11:52:29 2011
@@ -16,14 +16,15 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolProfileBuilder;
+import org.apache.camel.impl.ThreadPoolProfileSupport;
+import org.apache.camel.management.ManagementTestSupport;
+import org.apache.camel.spi.ThreadPoolProfile;
 
 /**
  *
  */
-public class ThrottlerAsyncDelayedCallerRunsTest extends ContextTestSupport {
+public class ThrottlerAsyncDelayedCallerRunsTest extends ManagementTestSupport {
     
     public void testThrottler() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(6);
@@ -44,9 +45,10 @@ public class ThrottlerAsyncDelayedCaller
             @Override
             public void configure() throws Exception {
                 // create a profile for the throttler
-                ThreadPoolProfileBuilder builder = new ThreadPoolProfileBuilder("myThrottler");
-                builder.maxQueueSize(2);
-                context.getExecutorServiceManager().registerThreadPoolProfile(builder.build());
+                ThreadPoolProfile profile = new ThreadPoolProfileSupport("myThrottler");
+                profile.setMaxPoolSize(5);
+                profile.setMaxQueueSize(2);
+                context.getExecutorServiceStrategy().registerThreadPoolProfile(profile);
                 
                 from("seda:start")
                     .throttle(1).timePeriodMillis(100)



Mime
View raw message