camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1157189 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/impl/
Date Fri, 12 Aug 2011 17:04:23 GMT
Author: davsclaus
Date: Fri Aug 12 17:04:23 2011
New Revision: 1157189

URL: http://svn.apache.org/viewvc?rev=1157189&view=rev
Log:
CAMEL-4298: Added ExecutorServiceStrategy to be backwards compatibile. Marked it as @deprecated.
This ensures 3rd party components still work with future Camel releases.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=1157189&r1=1157188&r2=1157189&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Fri Aug 12 17:04:23
2011
@@ -33,6 +33,7 @@ import org.apache.camel.spi.DataFormatRe
 import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.ExecutorServiceStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.InflightRepository;
@@ -925,6 +926,15 @@ public interface CamelContext extends Su
     ExecutorServiceManager getExecutorServiceManager();
 
     /**
+     * Gets the current {@link org.apache.camel.spi.ExecutorServiceStrategy}
+     *
+     * @return the manager
+     * @deprecated use {@link #getExecutorServiceManager()}
+     */
+    @Deprecated
+    ExecutorServiceStrategy getExecutorServiceStrategy();
+
+    /**
      * Sets a custom {@link org.apache.camel.spi.ExecutorServiceManager}
      *
      * @param executorServiceManager the custom manager

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1157189&r1=1157188&r2=1157189&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri
Aug 12 17:04:23 2011
@@ -93,6 +93,7 @@ import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.ExecutorServiceStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.InflightRepository;
@@ -2280,6 +2281,12 @@ public class DefaultCamelContext extends
         return this.executorServiceManager;
     }
 
+    public ExecutorServiceStrategy getExecutorServiceStrategy() {
+        // its okay to create a new instance as its stateless, and just delegate
+        // ExecutorServiceManager which is the new API
+        return new DefaultExecutorServiceStrategy(this);
+    }
+
     public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager)
{
         this.executorServiceManager = executorServiceManager;
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1157189&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
Fri Aug 12 17:04:23 2011
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExecutorServiceStrategy;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.util.concurrent.SynchronousExecutorService;
+
+/**
+ * @deprecated use {@link org.apache.camel.spi.ExecutorServiceManager} instead, will be removed
in a future Camel release
+ */
+@Deprecated
+public class DefaultExecutorServiceStrategy extends ServiceSupport implements ExecutorServiceStrategy
{
+
+    // delegate to ExecutorServiceManager
+
+    private final CamelContext camelContext;
+
+    public DefaultExecutorServiceStrategy(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public void registerThreadPoolProfile(ThreadPoolProfile profile) {
+        camelContext.getExecutorServiceManager().registerThreadPoolProfile(profile);
+    }
+
+    public ThreadPoolProfile getThreadPoolProfile(String id) {
+        return camelContext.getExecutorServiceManager().getThreadPoolProfile(id);
+    }
+
+    public ThreadPoolProfile getDefaultThreadPoolProfile() {
+        return camelContext.getExecutorServiceManager().getDefaultThreadPoolProfile();
+    }
+
+    public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
+        camelContext.getExecutorServiceManager().setDefaultThreadPoolProfile(defaultThreadPoolProfile);
+    }
+
+    public String getThreadName(String name) {
+        return camelContext.getExecutorServiceManager().resolveThreadName(name);
+    }
+
+    public String getThreadNamePattern() {
+        return camelContext.getExecutorServiceManager().getThreadNamePattern();
+    }
+
+    public void setThreadNamePattern(String pattern) throws IllegalArgumentException {
+        camelContext.getExecutorServiceManager().setThreadNamePattern(pattern);
+    }
+
+    public ExecutorService lookup(Object source, String name, String executorServiceRef)
{
+        ExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class);
+        if (answer == null) {
+            // try to see if we got a thread pool profile with that id
+            answer = newThreadPool(source, name, executorServiceRef);
+        }
+        return answer;
+    }
+
+    public ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef)
{
+        ScheduledExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef,
ScheduledExecutorService.class);
+        if (answer == null) {
+            ThreadPoolProfile profile = getThreadPoolProfile(executorServiceRef);
+            if (profile != null) {
+                Integer poolSize = profile.getPoolSize();
+                if (poolSize == null) {
+                    poolSize = getDefaultThreadPoolProfile().getPoolSize();
+                }
+                answer = newScheduledThreadPool(source, name, poolSize);
+            }
+        }
+        return answer;
+    }
+
+    public ExecutorService newDefaultThreadPool(Object source, String name) {
+        return camelContext.getExecutorServiceManager().newDefaultThreadPool(source, name);
+    }
+
+    public ExecutorService newThreadPool(Object source, String name, String threadPoolProfileId)
{
+        return camelContext.getExecutorServiceManager().newThreadPool(source, name, threadPoolProfileId);
+    }
+
+    public ExecutorService newCachedThreadPool(Object source, String name) {
+        return camelContext.getExecutorServiceManager().newCachedThreadPool(source, name);
+    }
+
+    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int
poolSize) {
+        return camelContext.getExecutorServiceManager().newScheduledThreadPool(source, name,
poolSize);
+    }
+
+    public ScheduledExecutorService newScheduledThreadPool(Object source, String name) {
+        return camelContext.getExecutorServiceManager().newDefaultScheduledThreadPool(source,
name);
+    }
+
+    public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
+        return camelContext.getExecutorServiceManager().newFixedThreadPool(source, name,
poolSize);
+    }
+
+    public ExecutorService newSingleThreadExecutor(Object source, String name) {
+        return camelContext.getExecutorServiceManager().newSingleThreadExecutor(source, name);
+    }
+
+    public ExecutorService newSynchronousThreadPool(Object source, String name) {
+        return new SynchronousExecutorService();
+    }
+
+    public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int
maxPoolSize) {
+        return camelContext.getExecutorServiceManager().newThreadPool(source, name, corePoolSize,
maxPoolSize);
+    }
+
+    public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int
maxPoolSize, int maxQueueSize) {
+        // use a profile with the settings
+        ThreadPoolProfile profile = new ThreadPoolProfile();
+        profile.setPoolSize(corePoolSize);
+        profile.setMaxPoolSize(maxPoolSize);
+        profile.setMaxQueueSize(maxQueueSize);
+
+        return camelContext.getExecutorServiceManager().newThreadPool(source, name, profile);
+    }
+
+    public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int
maxPoolSize,
+                                         long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                                         RejectedExecutionHandler rejectedExecutionHandler,
boolean daemon) {
+        // use a profile with the settings
+        ThreadPoolProfile profile = new ThreadPoolProfile();
+        profile.setPoolSize(corePoolSize);
+        profile.setMaxPoolSize(maxPoolSize);
+        profile.setMaxQueueSize(maxQueueSize);
+        profile.setKeepAliveTime(keepAliveTime);
+        profile.setTimeUnit(timeUnit);
+
+        // must cast to ThreadPoolExecutor to be able to set the rejected execution handler
+        ThreadPoolExecutor answer = (ThreadPoolExecutor) camelContext.getExecutorServiceManager().newThreadPool(source,
name, profile);
+        answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+        return answer;
+    }
+
+    public void shutdown(ExecutorService executorService) {
+        camelContext.getExecutorServiceManager().shutdown(executorService);
+    }
+
+    public List<Runnable> shutdownNow(ExecutorService executorService) {
+        return camelContext.getExecutorServiceManager().shutdownNow(executorService);
+    }
+
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    protected void doStop() throws Exception {
+        // noop
+    }
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=1157189&r1=1157188&r2=1157189&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
Fri Aug 12 17:04:23 2011
@@ -21,7 +21,7 @@ import org.apache.camel.spi.ThreadPoolPr
 /**
  * Use {@link ThreadPoolProfile} instead
  *
- * @deprecated use {@link ThreadPoolProfile} instead.
+ * @deprecated use {@link ThreadPoolProfile} instead, will be removed in a future Camel release
  */
 @Deprecated
 public class ThreadPoolProfileSupport extends ThreadPoolProfile {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1157189&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
Fri Aug 12 17:04:23 2011
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ShutdownableService;
+
+/**
+ * Strategy to create thread pools.
+ * <p/>
+ * This strategy is pluggable so you can plugin a custom provider, for example if you want
to leverage
+ * the WorkManager for a J2EE server.
+ * <p/>
+ * This strategy has fine grained methods for creating various thread pools, however custom
strategies
+ * do not have to exactly create those kind of pools. Feel free to return a shared or different
kind of pool.
+ * <p/>
+ * However there are two types of pools: regular and scheduled.
+ * <p/>
+ * If you use the <tt>newXXX</tt> methods to create thread pools, then Camel
will by default take care of
+ * shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting
down.
+ *
+ * @deprecated use {@link ExecutorServiceManager} instead, will be removed in a future Camel
release
+ */
+@Deprecated
+public interface ExecutorServiceStrategy extends ShutdownableService {
+
+    /**
+     * Registers the given thread pool profile
+     *
+     * @param profile the profile
+     */
+    void registerThreadPoolProfile(ThreadPoolProfile profile);
+
+    /**
+     * Gets the thread pool profile by the given id
+     *
+     * @param id  id of the thread pool profile to get
+     * @return the found profile, or <tt>null</tt> if not found
+     */
+    ThreadPoolProfile getThreadPoolProfile(String id);
+
+    /**
+     * Gets the default thread pool profile
+     *
+     * @return the default profile which are newer <tt>null</tt>
+     */
+    ThreadPoolProfile getDefaultThreadPoolProfile();
+
+    /**
+     * Sets the default thread pool profile
+     *
+     * @param defaultThreadPoolProfile the new default thread pool profile
+     */
+    void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile);
+
+    /**
+     * Creates a full thread name
+     *
+     * @param name  name which is appended to the full thread name
+     * @return the full thread name
+     */
+    String getThreadName(String name);
+
+    /**
+     * Gets the thread name pattern used for creating the full thread name.
+     *
+     * @return the pattern
+     */
+    String getThreadNamePattern();
+
+    /**
+     * Sets the thread name pattern used for creating the full thread name.
+     * <p/>
+     * The default pattern is: <tt>Camel (${camelId}) thread #${counter} - ${name}</tt>
+     * <p/>
+     * Where <tt>${camelId}</tt> is the name of the {@link org.apache.camel.CamelContext}
+     * <br/>and <tt>${counter}</tt> is a unique incrementing counter.
+     * <br/>and <tt>${name}</tt> is the regular thread name.
+     * <br/>You can also use <tt>${longName}</tt> is the long thread name
which can includes endpoint parameters etc.
+     *
+     * @param pattern  the pattern
+     * @throws IllegalArgumentException if the pattern is invalid.
+     */
+    void setThreadNamePattern(String pattern) throws IllegalArgumentException;
+
+    /**
+     * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry}
+     * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
+     *
+     * @param source               the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name                 name which is appended to the thread name
+     * @param executorServiceRef   reference to lookup
+     * @return the {@link java.util.concurrent.ExecutorService} or <tt>null</tt>
if not found
+     */
+    ExecutorService lookup(Object source, String name, String executorServiceRef);
+
+    /**
+     * Lookup a {@link java.util.concurrent.ScheduledExecutorService} from the {@link org.apache.camel.spi.Registry}
+     * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
+     *
+     * @param source               the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name                 name which is appended to the thread name
+     * @param executorServiceRef   reference to lookup
+     * @return the {@link java.util.concurrent.ScheduledExecutorService} or <tt>null</tt>
if not found
+     */
+    ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef);
+
+    /**
+     * Creates a new thread pool using the default thread pool profile.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newDefaultThreadPool(Object source, String name);
+
+    /**
+     * Creates a new thread pool using based on the given profile id.
+     *
+     * @param source                the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name                  name which is appended to the thread name
+     * @param threadPoolProfileId   id of the thread pool profile to use for creating the
thread pool
+     * @return the created thread pool, or <tt>null</tt> if the was no thread
pool profile with that given id.
+     */
+    ExecutorService newThreadPool(Object source, String name, String threadPoolProfileId);
+
+    /**
+     * Creates a new cached thread pool.
+     * <p/>
+     * <b>Important:</b> Using cached thread pool is discouraged as they have
no upper bound and can overload the JVM.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newCachedThreadPool(Object source, String name);
+
+    /**
+     * Creates a new scheduled thread pool.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @param poolSize    the core pool size
+     * @return the created thread pool
+     */
+    ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
+
+    /**
+     * Creates a new scheduled thread pool.
+     * <p/>
+     * Will use the pool size from the default thread pool profile
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ScheduledExecutorService newScheduledThreadPool(Object source, String name);
+
+    /**
+     * Creates a new fixed thread pool.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @param poolSize    the core pool size
+     * @return the created thread pool
+     */
+    ExecutorService newFixedThreadPool(Object source, String name, int poolSize);
+
+    /**
+     * Creates a new single-threaded thread pool. This is often used for background threads.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newSingleThreadExecutor(Object source, String name);
+
+    /**
+     * Creates a new synchronous thread pool, which executes the task in the caller thread
(no task queue).
+     *
+     * @param source      the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newSynchronousThreadPool(Object source, String name);
+
+    /**
+     * Creates a new custom thread pool.
+     * <p/>
+     * Will by default use 60 seconds for keep alive time for idle threads.
+     * And use {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy}
as rejection handler
+     *
+     * @param source        the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name          name which is appended to the thread name
+     * @param corePoolSize  the core pool size
+     * @param maxPoolSize   the maximum pool size
+     * @return the created thread pool
+     */
+    ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize);
+
+    /**
+     * Creates a new custom thread pool.
+     * <p/>
+     * Will by default use 60 seconds for keep alive time for idle threads.
+     * And use {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy}
as rejection handler
+     *
+     * @param source        the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name          name which is appended to the thread name
+     * @param corePoolSize  the core pool size
+     * @param maxPoolSize   the maximum pool size
+     * @param maxQueueSize  the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt>
or <tt>-1</tt> to indicate unbounded
+     * @return the created thread pool
+     */
+    ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize,
int maxQueueSize);
+
+    /**
+     * Creates a new custom thread pool.
+     *
+     * @param source                     the source object, usually it should be <tt>this</tt>
passed in as parameter
+     * @param name                       name which is appended to the thread name
+     * @param corePoolSize               the core pool size
+     * @param maxPoolSize                the maximum pool size
+     * @param keepAliveTime              keep alive time for idle threads
+     * @param timeUnit                   time unit for keep alive time
+     * @param maxQueueSize               the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt>
or <tt>-1</tt> to indicate unbounded
+     * @param rejectedExecutionHandler   the handler for tasks which cannot be executed by
the thread pool.
+     *                                   If <tt>null</tt> is provided then {@link
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} is used.
+     * @param daemon                     whether or not the created threads is daemon or
not
+     * @return the created thread pool
+     */
+    ExecutorService newThreadPool(Object source, final String name, int corePoolSize, int
maxPoolSize,
+                                  long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                                  RejectedExecutionHandler rejectedExecutionHandler, boolean
daemon);
+
+    /**
+     * Shutdown the given executor service.
+     *
+     * @param executorService the executor service to shutdown
+     * @see java.util.concurrent.ExecutorService#shutdown()
+     */
+    void shutdown(ExecutorService executorService);
+
+    /**
+     * Shutdown now the given executor service.
+     *
+     * @param executorService the executor service to shutdown now
+     * @return list of tasks that never commenced execution
+     * @see java.util.concurrent.ExecutorService#shutdownNow()
+     */
+    List<Runnable> shutdownNow(ExecutorService executorService);
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=1157189&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
Fri Aug 12 17:04:23 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+
+/**
+ * Unit test to ensure the {@link org.apache.camel.spi.ExecutorServiceStrategy} still
+ * works to keep backwards compatibility.
+ *
+ * @version 
+ */
+public class DefaultExecutorServiceStrategyTest extends ContextTestSupport {
+
+    public void testGetThreadNameDefaultPattern() throws Exception {
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo");
+        String bar = context.getExecutorServiceStrategy().getThreadName("bar");
+
+        assertNotSame(foo, bar);
+        assertTrue(foo.startsWith("Camel (" + context.getName() + ") thread "));
+        assertTrue(foo.endsWith("foo"));
+        assertTrue(bar.startsWith("Camel (" + context.getName() + ") thread "));
+        assertTrue(bar.endsWith("bar"));
+    }
+
+    public void testGetThreadNameCustomPattern() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("#${counter} - ${name}");
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo");
+        String bar = context.getExecutorServiceStrategy().getThreadName("bar");
+
+        assertNotSame(foo, bar);
+        assertTrue(foo.startsWith("#"));
+        assertTrue(foo.endsWith(" - foo"));
+        assertTrue(bar.startsWith("#"));
+        assertTrue(bar.endsWith(" - bar"));
+    }
+
+    public void testGetThreadNameCustomPatternCamelId() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("#${camelId} - #${counter}
- ${name}");
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo");
+        String bar = context.getExecutorServiceStrategy().getThreadName("bar");
+
+        assertNotSame(foo, bar);
+        assertTrue(foo.startsWith("#" + context.getName() + " - #"));
+        assertTrue(foo.endsWith(" - foo"));
+        assertTrue(bar.startsWith("#" + context.getName() + " - #"));
+        assertTrue(bar.endsWith(" - bar"));
+    }
+
+    public void testGetThreadNameCustomPatternWithDollar() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("Hello - ${name}");
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo$bar");
+
+        assertEquals("Hello - foo$bar", foo);
+    }
+
+    public void testGetThreadNameCustomPatternLongName() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("#${counter} - ${longName}");
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo?beer=Carlsberg");
+        String bar = context.getExecutorServiceStrategy().getThreadName("bar");
+
+        assertNotSame(foo, bar);
+        assertTrue(foo.startsWith("#"));
+        assertTrue(foo.endsWith(" - foo?beer=Carlsberg"));
+        assertTrue(bar.startsWith("#"));
+        assertTrue(bar.endsWith(" - bar"));
+    }
+
+    public void testGetThreadNameCustomPatternWithParameters() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("#${counter} - ${name}");
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo?beer=Carlsberg");
+        String bar = context.getExecutorServiceStrategy().getThreadName("bar");
+
+        assertNotSame(foo, bar);
+        assertTrue(foo.startsWith("#"));
+        assertTrue(foo.endsWith(" - foo"));
+        assertTrue(bar.startsWith("#"));
+        assertTrue(bar.endsWith(" - bar"));
+    }
+
+    public void testGetThreadNameCustomPatternNoCounter() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("Cool ${name}");
+        String foo = context.getExecutorServiceStrategy().getThreadName("foo");
+        String bar = context.getExecutorServiceStrategy().getThreadName("bar");
+
+        assertNotSame(foo, bar);
+        assertEquals("Cool foo", foo);
+        assertEquals("Cool bar", bar);
+    }
+
+    public void testGetThreadNameCustomPatternInvalid() throws Exception {
+        context.getExecutorServiceStrategy().setThreadNamePattern("Cool ${xxx}");
+        try {
+            context.getExecutorServiceStrategy().getThreadName("foo");
+            fail("Should thrown an exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Pattern is invalid: Cool ${xxx}", e.getMessage());
+        }
+
+        // reset it so we can shutdown properly
+        context.getExecutorServiceStrategy().setThreadNamePattern("Camel Thread ${counter}
- ${name}");
+    }
+
+    public void testDefaultThreadPool() throws Exception {
+        ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this,
"myPool");
+        assertEquals(false, myPool.isShutdown());
+
+        // should use default settings
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+        assertEquals(10, executor.getCorePoolSize());
+        assertEquals(20, executor.getMaximumPoolSize());
+        assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(1000, executor.getQueue().remainingCapacity());
+
+        context.stop();
+        assertEquals(true, myPool.isShutdown());
+    }
+
+    public void testDefaultUnboundedQueueThreadPool() throws Exception {
+        ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport("custom");
+        custom.setPoolSize(10);
+        custom.setMaxPoolSize(30);
+        custom.setKeepAliveTime(50L);
+        custom.setMaxQueueSize(-1);
+
+        context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom);
+        assertEquals(true, custom.isDefaultProfile().booleanValue());
+
+        ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this,
"myPool");
+        assertEquals(false, myPool.isShutdown());
+
+        // should use default settings
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+        assertEquals(10, executor.getCorePoolSize());
+        assertEquals(30, executor.getMaximumPoolSize());
+        assertEquals(50, executor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity());
+
+        context.stop();
+        assertEquals(true, myPool.isShutdown());
+    }
+
+    public void testCustomDefaultThreadPool() throws Exception {
+        ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport("custom");
+        custom.setKeepAliveTime(20L);
+        custom.setMaxPoolSize(40);
+        custom.setPoolSize(5);
+        custom.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom);
+        assertEquals(true, custom.isDefaultProfile().booleanValue());
+
+        ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this,
"myPool");
+        assertEquals(false, myPool.isShutdown());
+
+        // should use default settings
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool;
+        assertEquals(5, executor.getCorePoolSize());
+        assertEquals(40, executor.getMaximumPoolSize());
+        assertEquals(20, executor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(2000, executor.getQueue().remainingCapacity());
+
+        context.stop();
+        assertEquals(true, myPool.isShutdown());
+    }
+
+    public void testGetThreadPoolProfile() throws Exception {
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo");
+        foo.setKeepAliveTime(20L);
+        foo.setMaxPoolSize(40);
+        foo.setPoolSize(5);
+        foo.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+
+        assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+    }
+
+    public void testTwoGetThreadPoolProfile() throws Exception {
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo");
+        foo.setKeepAliveTime(20L);
+        foo.setMaxPoolSize(40);
+        foo.setPoolSize(5);
+        foo.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+
+        ThreadPoolProfileSupport bar = new ThreadPoolProfileSupport("bar");
+        bar.setKeepAliveTime(40L);
+        bar.setMaxPoolSize(5);
+        bar.setPoolSize(1);
+        bar.setMaxQueueSize(100);
+
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(bar);
+
+        assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+        assertSame(bar, context.getExecutorServiceStrategy().getThreadPoolProfile("bar"));
+        assertNotSame(foo, bar);
+
+        assertFalse(context.getExecutorServiceStrategy().getThreadPoolProfile("foo").isDefaultProfile());
+        assertFalse(context.getExecutorServiceStrategy().getThreadPoolProfile("bar").isDefaultProfile());
+    }
+
+    public void testGetThreadPoolProfileInheritDefaultValues() throws Exception {
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo");
+        foo.setMaxPoolSize(40);
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+        assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+
+        ExecutorService executor = context.getExecutorServiceStrategy().newThreadPool(this,
"MyPool", "foo");
+        ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor);
+        assertEquals(40, tp.getMaximumPoolSize());
+        // should inherit the default values
+        assertEquals(10, tp.getCorePoolSize());
+        assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+    }
+
+    public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception {
+        ThreadPoolProfileSupport newDefault = new ThreadPoolProfileSupport("newDefault");
+        newDefault.setKeepAliveTime(30L);
+        newDefault.setMaxPoolSize(50);
+        newDefault.setPoolSize(5);
+        newDefault.setMaxQueueSize(2000);
+        newDefault.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
+        context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(newDefault);
+
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo");
+        foo.setMaxPoolSize(25);
+        foo.setPoolSize(1);
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+        assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+
+        ExecutorService executor = context.getExecutorServiceStrategy().newThreadPool(this,
"MyPool", "foo");
+
+        ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor);
+        assertEquals(25, tp.getMaximumPoolSize());
+        // should inherit the default values
+        assertEquals(1, tp.getCorePoolSize());
+        assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertIsInstanceOf(ThreadPoolExecutor.AbortPolicy.class, tp.getRejectedExecutionHandler());
+    }
+
+    public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception {
+        ThreadPoolProfileSupport newDefault = new ThreadPoolProfileSupport("newDefault");
+        // just change the max pool as the default profile should then inherit the old default
profile
+        newDefault.setMaxPoolSize(50);
+        context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(newDefault);
+
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo");
+        foo.setPoolSize(1);
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+        assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+
+        ExecutorService executor = context.getExecutorServiceStrategy().newThreadPool(this,
"MyPool", "foo");
+
+        ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor);
+        assertEquals(1, tp.getCorePoolSize());
+        // should inherit the default values
+        assertEquals(50, tp.getMaximumPoolSize());
+        assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+    }
+
+    public void testNewThreadPoolProfile() throws Exception {
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo"));
+
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo");
+        foo.setKeepAliveTime(20L);
+        foo.setMaxPoolSize(40);
+        foo.setPoolSize(5);
+        foo.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+
+        ExecutorService pool = context.getExecutorServiceStrategy().newThreadPool(this, "Cool",
"foo");
+        assertNotNull(pool);
+
+        ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
+        assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(40, tp.getMaximumPoolSize());
+        assertEquals(5, tp.getCorePoolSize());
+        assertFalse(tp.isShutdown());
+
+        context.stop();
+
+        assertTrue(tp.isShutdown());
+    }
+
+    public void testLookupThreadPoolProfile() throws Exception {
+        ExecutorService pool = context.getExecutorServiceStrategy().lookup(this, "Cool",
"fooProfile");
+        // does not exists yet
+        assertNull(pool);
+
+        assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("fooProfile"));
+
+        ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("fooProfile");
+        foo.setKeepAliveTime(20L);
+        foo.setMaxPoolSize(40);
+        foo.setPoolSize(5);
+        foo.setMaxQueueSize(2000);
+
+        context.getExecutorServiceStrategy().registerThreadPoolProfile(foo);
+
+        pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile");
+        assertNotNull(pool);
+
+        ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool);
+        assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(40, tp.getMaximumPoolSize());
+        assertEquals(5, tp.getCorePoolSize());
+        assertFalse(tp.isShutdown());
+
+        context.stop();
+
+        assertTrue(tp.isShutdown());
+    }
+
+}



Mime
View raw message