Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 36E18829D for ; Fri, 12 Aug 2011 17:04:52 +0000 (UTC) Received: (qmail 30828 invoked by uid 500); 12 Aug 2011 17:04:52 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 30778 invoked by uid 500); 12 Aug 2011 17:04:51 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 30757 invoked by uid 99); 12 Aug 2011 17:04:51 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Aug 2011 17:04:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Aug 2011 17:04:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1274123889B3 for ; Fri, 12 Aug 2011 17:04:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1157189 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/impl/ Date: Fri, 12 Aug 2011 17:04:23 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110812170424.1274123889B3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Fri Aug 12 17:04:23 2011 New Revision: 1157189 URL: http://svn.apache.org/viewvc?rev=1157189&view=rev Log: CAMEL-4298: Added ExecutorServiceStrategy to be backwards compatibile. Marked it as @deprecated. This ensures 3rd party components still work with future Camel releases. Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=1157189&r1=1157188&r2=1157189&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Fri Aug 12 17:04:23 2011 @@ -33,6 +33,7 @@ import org.apache.camel.spi.DataFormatRe import org.apache.camel.spi.Debugger; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.ExecutorServiceStrategy; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; import org.apache.camel.spi.InflightRepository; @@ -925,6 +926,15 @@ public interface CamelContext extends Su ExecutorServiceManager getExecutorServiceManager(); /** + * Gets the current {@link org.apache.camel.spi.ExecutorServiceStrategy} + * + * @return the manager + * @deprecated use {@link #getExecutorServiceManager()} + */ + @Deprecated + ExecutorServiceStrategy getExecutorServiceStrategy(); + + /** * Sets a custom {@link org.apache.camel.spi.ExecutorServiceManager} * * @param executorServiceManager the custom manager Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1157189&r1=1157188&r2=1157189&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Aug 12 17:04:23 2011 @@ -93,6 +93,7 @@ import org.apache.camel.spi.Debugger; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.ExecutorServiceStrategy; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; import org.apache.camel.spi.InflightRepository; @@ -2280,6 +2281,12 @@ public class DefaultCamelContext extends return this.executorServiceManager; } + public ExecutorServiceStrategy getExecutorServiceStrategy() { + // its okay to create a new instance as its stateless, and just delegate + // ExecutorServiceManager which is the new API + return new DefaultExecutorServiceStrategy(this); + } + public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { this.executorServiceManager = executorServiceManager; } Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1157189&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Aug 12 17:04:23 2011 @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.ExecutorServiceStrategy; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.util.concurrent.SynchronousExecutorService; + +/** + * @deprecated use {@link org.apache.camel.spi.ExecutorServiceManager} instead, will be removed in a future Camel release + */ +@Deprecated +public class DefaultExecutorServiceStrategy extends ServiceSupport implements ExecutorServiceStrategy { + + // delegate to ExecutorServiceManager + + private final CamelContext camelContext; + + public DefaultExecutorServiceStrategy(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public void registerThreadPoolProfile(ThreadPoolProfile profile) { + camelContext.getExecutorServiceManager().registerThreadPoolProfile(profile); + } + + public ThreadPoolProfile getThreadPoolProfile(String id) { + return camelContext.getExecutorServiceManager().getThreadPoolProfile(id); + } + + public ThreadPoolProfile getDefaultThreadPoolProfile() { + return camelContext.getExecutorServiceManager().getDefaultThreadPoolProfile(); + } + + public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) { + camelContext.getExecutorServiceManager().setDefaultThreadPoolProfile(defaultThreadPoolProfile); + } + + public String getThreadName(String name) { + return camelContext.getExecutorServiceManager().resolveThreadName(name); + } + + public String getThreadNamePattern() { + return camelContext.getExecutorServiceManager().getThreadNamePattern(); + } + + public void setThreadNamePattern(String pattern) throws IllegalArgumentException { + camelContext.getExecutorServiceManager().setThreadNamePattern(pattern); + } + + public ExecutorService lookup(Object source, String name, String executorServiceRef) { + ExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ExecutorService.class); + if (answer == null) { + // try to see if we got a thread pool profile with that id + answer = newThreadPool(source, name, executorServiceRef); + } + return answer; + } + + public ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef) { + ScheduledExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class); + if (answer == null) { + ThreadPoolProfile profile = getThreadPoolProfile(executorServiceRef); + if (profile != null) { + Integer poolSize = profile.getPoolSize(); + if (poolSize == null) { + poolSize = getDefaultThreadPoolProfile().getPoolSize(); + } + answer = newScheduledThreadPool(source, name, poolSize); + } + } + return answer; + } + + public ExecutorService newDefaultThreadPool(Object source, String name) { + return camelContext.getExecutorServiceManager().newDefaultThreadPool(source, name); + } + + public ExecutorService newThreadPool(Object source, String name, String threadPoolProfileId) { + return camelContext.getExecutorServiceManager().newThreadPool(source, name, threadPoolProfileId); + } + + public ExecutorService newCachedThreadPool(Object source, String name) { + return camelContext.getExecutorServiceManager().newCachedThreadPool(source, name); + } + + public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { + return camelContext.getExecutorServiceManager().newScheduledThreadPool(source, name, poolSize); + } + + public ScheduledExecutorService newScheduledThreadPool(Object source, String name) { + return camelContext.getExecutorServiceManager().newDefaultScheduledThreadPool(source, name); + } + + public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { + return camelContext.getExecutorServiceManager().newFixedThreadPool(source, name, poolSize); + } + + public ExecutorService newSingleThreadExecutor(Object source, String name) { + return camelContext.getExecutorServiceManager().newSingleThreadExecutor(source, name); + } + + public ExecutorService newSynchronousThreadPool(Object source, String name) { + return new SynchronousExecutorService(); + } + + public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) { + return camelContext.getExecutorServiceManager().newThreadPool(source, name, corePoolSize, maxPoolSize); + } + + public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize) { + // use a profile with the settings + ThreadPoolProfile profile = new ThreadPoolProfile(); + profile.setPoolSize(corePoolSize); + profile.setMaxPoolSize(maxPoolSize); + profile.setMaxQueueSize(maxQueueSize); + + return camelContext.getExecutorServiceManager().newThreadPool(source, name, profile); + } + + public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, + long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + RejectedExecutionHandler rejectedExecutionHandler, boolean daemon) { + // use a profile with the settings + ThreadPoolProfile profile = new ThreadPoolProfile(); + profile.setPoolSize(corePoolSize); + profile.setMaxPoolSize(maxPoolSize); + profile.setMaxQueueSize(maxQueueSize); + profile.setKeepAliveTime(keepAliveTime); + profile.setTimeUnit(timeUnit); + + // must cast to ThreadPoolExecutor to be able to set the rejected execution handler + ThreadPoolExecutor answer = (ThreadPoolExecutor) camelContext.getExecutorServiceManager().newThreadPool(source, name, profile); + answer.setRejectedExecutionHandler(rejectedExecutionHandler); + return answer; + } + + public void shutdown(ExecutorService executorService) { + camelContext.getExecutorServiceManager().shutdown(executorService); + } + + public List shutdownNow(ExecutorService executorService) { + return camelContext.getExecutorServiceManager().shutdownNow(executorService); + } + + protected void doStart() throws Exception { + // noop + } + + protected void doStop() throws Exception { + // noop + } +} Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=1157189&r1=1157188&r2=1157189&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java Fri Aug 12 17:04:23 2011 @@ -21,7 +21,7 @@ import org.apache.camel.spi.ThreadPoolPr /** * Use {@link ThreadPoolProfile} instead * - * @deprecated use {@link ThreadPoolProfile} instead. + * @deprecated use {@link ThreadPoolProfile} instead, will be removed in a future Camel release */ @Deprecated public class ThreadPoolProfileSupport extends ThreadPoolProfile { Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1157189&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Aug 12 17:04:23 2011 @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ShutdownableService; + +/** + * Strategy to create thread pools. + *

+ * This strategy is pluggable so you can plugin a custom provider, for example if you want to leverage + * the WorkManager for a J2EE server. + *

+ * This strategy has fine grained methods for creating various thread pools, however custom strategies + * do not have to exactly create those kind of pools. Feel free to return a shared or different kind of pool. + *

+ * However there are two types of pools: regular and scheduled. + *

+ * If you use the newXXX methods to create thread pools, then Camel will by default take care of + * shutting down those created pools when {@link org.apache.camel.CamelContext} is shutting down. + * + * @deprecated use {@link ExecutorServiceManager} instead, will be removed in a future Camel release + */ +@Deprecated +public interface ExecutorServiceStrategy extends ShutdownableService { + + /** + * Registers the given thread pool profile + * + * @param profile the profile + */ + void registerThreadPoolProfile(ThreadPoolProfile profile); + + /** + * Gets the thread pool profile by the given id + * + * @param id id of the thread pool profile to get + * @return the found profile, or null if not found + */ + ThreadPoolProfile getThreadPoolProfile(String id); + + /** + * Gets the default thread pool profile + * + * @return the default profile which are newer null + */ + ThreadPoolProfile getDefaultThreadPoolProfile(); + + /** + * Sets the default thread pool profile + * + * @param defaultThreadPoolProfile the new default thread pool profile + */ + void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile); + + /** + * Creates a full thread name + * + * @param name name which is appended to the full thread name + * @return the full thread name + */ + String getThreadName(String name); + + /** + * Gets the thread name pattern used for creating the full thread name. + * + * @return the pattern + */ + String getThreadNamePattern(); + + /** + * Sets the thread name pattern used for creating the full thread name. + *

+ * The default pattern is: Camel (${camelId}) thread #${counter} - ${name} + *

+ * Where ${camelId} is the name of the {@link org.apache.camel.CamelContext} + *
and ${counter} is a unique incrementing counter. + *
and ${name} is the regular thread name. + *
You can also use ${longName} is the long thread name which can includes endpoint parameters etc. + * + * @param pattern the pattern + * @throws IllegalArgumentException if the pattern is invalid. + */ + void setThreadNamePattern(String pattern) throws IllegalArgumentException; + + /** + * Lookup a {@link java.util.concurrent.ExecutorService} from the {@link org.apache.camel.spi.Registry} + * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param executorServiceRef reference to lookup + * @return the {@link java.util.concurrent.ExecutorService} or null if not found + */ + ExecutorService lookup(Object source, String name, String executorServiceRef); + + /** + * Lookup a {@link java.util.concurrent.ScheduledExecutorService} from the {@link org.apache.camel.spi.Registry} + * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param executorServiceRef reference to lookup + * @return the {@link java.util.concurrent.ScheduledExecutorService} or null if not found + */ + ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef); + + /** + * Creates a new thread pool using the default thread pool profile. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ExecutorService newDefaultThreadPool(Object source, String name); + + /** + * Creates a new thread pool using based on the given profile id. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param threadPoolProfileId id of the thread pool profile to use for creating the thread pool + * @return the created thread pool, or null if the was no thread pool profile with that given id. + */ + ExecutorService newThreadPool(Object source, String name, String threadPoolProfileId); + + /** + * Creates a new cached thread pool. + *

+ * Important: Using cached thread pool is discouraged as they have no upper bound and can overload the JVM. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ExecutorService newCachedThreadPool(Object source, String name); + + /** + * Creates a new scheduled thread pool. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param poolSize the core pool size + * @return the created thread pool + */ + ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize); + + /** + * Creates a new scheduled thread pool. + *

+ * Will use the pool size from the default thread pool profile + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ScheduledExecutorService newScheduledThreadPool(Object source, String name); + + /** + * Creates a new fixed thread pool. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param poolSize the core pool size + * @return the created thread pool + */ + ExecutorService newFixedThreadPool(Object source, String name, int poolSize); + + /** + * Creates a new single-threaded thread pool. This is often used for background threads. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ExecutorService newSingleThreadExecutor(Object source, String name); + + /** + * Creates a new synchronous thread pool, which executes the task in the caller thread (no task queue). + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ExecutorService newSynchronousThreadPool(Object source, String name); + + /** + * Creates a new custom thread pool. + *

+ * Will by default use 60 seconds for keep alive time for idle threads. + * And use {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as rejection handler + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param corePoolSize the core pool size + * @param maxPoolSize the maximum pool size + * @return the created thread pool + */ + ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize); + + /** + * Creates a new custom thread pool. + *

+ * Will by default use 60 seconds for keep alive time for idle threads. + * And use {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as rejection handler + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param corePoolSize the core pool size + * @param maxPoolSize the maximum pool size + * @param maxQueueSize the maximum number of tasks in the queue, use Integer.MAX_INT or -1 to indicate unbounded + * @return the created thread pool + */ + ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize); + + /** + * Creates a new custom thread pool. + * + * @param source the source object, usually it should be this passed in as parameter + * @param name name which is appended to the thread name + * @param corePoolSize the core pool size + * @param maxPoolSize the maximum pool size + * @param keepAliveTime keep alive time for idle threads + * @param timeUnit time unit for keep alive time + * @param maxQueueSize the maximum number of tasks in the queue, use Integer.MAX_INT or -1 to indicate unbounded + * @param rejectedExecutionHandler the handler for tasks which cannot be executed by the thread pool. + * If null is provided then {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} is used. + * @param daemon whether or not the created threads is daemon or not + * @return the created thread pool + */ + ExecutorService newThreadPool(Object source, final String name, int corePoolSize, int maxPoolSize, + long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, + RejectedExecutionHandler rejectedExecutionHandler, boolean daemon); + + /** + * Shutdown the given executor service. + * + * @param executorService the executor service to shutdown + * @see java.util.concurrent.ExecutorService#shutdown() + */ + void shutdown(ExecutorService executorService); + + /** + * Shutdown now the given executor service. + * + * @param executorService the executor service to shutdown now + * @return list of tasks that never commenced execution + * @see java.util.concurrent.ExecutorService#shutdownNow() + */ + List shutdownNow(ExecutorService executorService); + +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=1157189&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Fri Aug 12 17:04:23 2011 @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ThreadPoolRejectedPolicy; + +/** + * Unit test to ensure the {@link org.apache.camel.spi.ExecutorServiceStrategy} still + * works to keep backwards compatibility. + * + * @version + */ +public class DefaultExecutorServiceStrategyTest extends ContextTestSupport { + + public void testGetThreadNameDefaultPattern() throws Exception { + String foo = context.getExecutorServiceStrategy().getThreadName("foo"); + String bar = context.getExecutorServiceStrategy().getThreadName("bar"); + + assertNotSame(foo, bar); + assertTrue(foo.startsWith("Camel (" + context.getName() + ") thread ")); + assertTrue(foo.endsWith("foo")); + assertTrue(bar.startsWith("Camel (" + context.getName() + ") thread ")); + assertTrue(bar.endsWith("bar")); + } + + public void testGetThreadNameCustomPattern() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("#${counter} - ${name}"); + String foo = context.getExecutorServiceStrategy().getThreadName("foo"); + String bar = context.getExecutorServiceStrategy().getThreadName("bar"); + + assertNotSame(foo, bar); + assertTrue(foo.startsWith("#")); + assertTrue(foo.endsWith(" - foo")); + assertTrue(bar.startsWith("#")); + assertTrue(bar.endsWith(" - bar")); + } + + public void testGetThreadNameCustomPatternCamelId() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("#${camelId} - #${counter} - ${name}"); + String foo = context.getExecutorServiceStrategy().getThreadName("foo"); + String bar = context.getExecutorServiceStrategy().getThreadName("bar"); + + assertNotSame(foo, bar); + assertTrue(foo.startsWith("#" + context.getName() + " - #")); + assertTrue(foo.endsWith(" - foo")); + assertTrue(bar.startsWith("#" + context.getName() + " - #")); + assertTrue(bar.endsWith(" - bar")); + } + + public void testGetThreadNameCustomPatternWithDollar() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("Hello - ${name}"); + String foo = context.getExecutorServiceStrategy().getThreadName("foo$bar"); + + assertEquals("Hello - foo$bar", foo); + } + + public void testGetThreadNameCustomPatternLongName() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("#${counter} - ${longName}"); + String foo = context.getExecutorServiceStrategy().getThreadName("foo?beer=Carlsberg"); + String bar = context.getExecutorServiceStrategy().getThreadName("bar"); + + assertNotSame(foo, bar); + assertTrue(foo.startsWith("#")); + assertTrue(foo.endsWith(" - foo?beer=Carlsberg")); + assertTrue(bar.startsWith("#")); + assertTrue(bar.endsWith(" - bar")); + } + + public void testGetThreadNameCustomPatternWithParameters() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("#${counter} - ${name}"); + String foo = context.getExecutorServiceStrategy().getThreadName("foo?beer=Carlsberg"); + String bar = context.getExecutorServiceStrategy().getThreadName("bar"); + + assertNotSame(foo, bar); + assertTrue(foo.startsWith("#")); + assertTrue(foo.endsWith(" - foo")); + assertTrue(bar.startsWith("#")); + assertTrue(bar.endsWith(" - bar")); + } + + public void testGetThreadNameCustomPatternNoCounter() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("Cool ${name}"); + String foo = context.getExecutorServiceStrategy().getThreadName("foo"); + String bar = context.getExecutorServiceStrategy().getThreadName("bar"); + + assertNotSame(foo, bar); + assertEquals("Cool foo", foo); + assertEquals("Cool bar", bar); + } + + public void testGetThreadNameCustomPatternInvalid() throws Exception { + context.getExecutorServiceStrategy().setThreadNamePattern("Cool ${xxx}"); + try { + context.getExecutorServiceStrategy().getThreadName("foo"); + fail("Should thrown an exception"); + } catch (IllegalArgumentException e) { + assertEquals("Pattern is invalid: Cool ${xxx}", e.getMessage()); + } + + // reset it so we can shutdown properly + context.getExecutorServiceStrategy().setThreadNamePattern("Camel Thread ${counter} - ${name}"); + } + + public void testDefaultThreadPool() throws Exception { + ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool"); + assertEquals(false, myPool.isShutdown()); + + // should use default settings + ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool; + assertEquals(10, executor.getCorePoolSize()); + assertEquals(20, executor.getMaximumPoolSize()); + assertEquals(60, executor.getKeepAliveTime(TimeUnit.SECONDS)); + assertEquals(1000, executor.getQueue().remainingCapacity()); + + context.stop(); + assertEquals(true, myPool.isShutdown()); + } + + public void testDefaultUnboundedQueueThreadPool() throws Exception { + ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport("custom"); + custom.setPoolSize(10); + custom.setMaxPoolSize(30); + custom.setKeepAliveTime(50L); + custom.setMaxQueueSize(-1); + + context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom); + assertEquals(true, custom.isDefaultProfile().booleanValue()); + + ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool"); + assertEquals(false, myPool.isShutdown()); + + // should use default settings + ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool; + assertEquals(10, executor.getCorePoolSize()); + assertEquals(30, executor.getMaximumPoolSize()); + assertEquals(50, executor.getKeepAliveTime(TimeUnit.SECONDS)); + assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity()); + + context.stop(); + assertEquals(true, myPool.isShutdown()); + } + + public void testCustomDefaultThreadPool() throws Exception { + ThreadPoolProfileSupport custom = new ThreadPoolProfileSupport("custom"); + custom.setKeepAliveTime(20L); + custom.setMaxPoolSize(40); + custom.setPoolSize(5); + custom.setMaxQueueSize(2000); + + context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(custom); + assertEquals(true, custom.isDefaultProfile().booleanValue()); + + ExecutorService myPool = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "myPool"); + assertEquals(false, myPool.isShutdown()); + + // should use default settings + ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool; + assertEquals(5, executor.getCorePoolSize()); + assertEquals(40, executor.getMaximumPoolSize()); + assertEquals(20, executor.getKeepAliveTime(TimeUnit.SECONDS)); + assertEquals(2000, executor.getQueue().remainingCapacity()); + + context.stop(); + assertEquals(true, myPool.isShutdown()); + } + + public void testGetThreadPoolProfile() throws Exception { + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo"); + foo.setKeepAliveTime(20L); + foo.setMaxPoolSize(40); + foo.setPoolSize(5); + foo.setMaxQueueSize(2000); + + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + + assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + } + + public void testTwoGetThreadPoolProfile() throws Exception { + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo"); + foo.setKeepAliveTime(20L); + foo.setMaxPoolSize(40); + foo.setPoolSize(5); + foo.setMaxQueueSize(2000); + + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + + ThreadPoolProfileSupport bar = new ThreadPoolProfileSupport("bar"); + bar.setKeepAliveTime(40L); + bar.setMaxPoolSize(5); + bar.setPoolSize(1); + bar.setMaxQueueSize(100); + + context.getExecutorServiceStrategy().registerThreadPoolProfile(bar); + + assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + assertSame(bar, context.getExecutorServiceStrategy().getThreadPoolProfile("bar")); + assertNotSame(foo, bar); + + assertFalse(context.getExecutorServiceStrategy().getThreadPoolProfile("foo").isDefaultProfile()); + assertFalse(context.getExecutorServiceStrategy().getThreadPoolProfile("bar").isDefaultProfile()); + } + + public void testGetThreadPoolProfileInheritDefaultValues() throws Exception { + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo"); + foo.setMaxPoolSize(40); + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + + ExecutorService executor = context.getExecutorServiceStrategy().newThreadPool(this, "MyPool", "foo"); + ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); + assertEquals(40, tp.getMaximumPoolSize()); + // should inherit the default values + assertEquals(10, tp.getCorePoolSize()); + assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS)); + assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler()); + } + + public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception { + ThreadPoolProfileSupport newDefault = new ThreadPoolProfileSupport("newDefault"); + newDefault.setKeepAliveTime(30L); + newDefault.setMaxPoolSize(50); + newDefault.setPoolSize(5); + newDefault.setMaxQueueSize(2000); + newDefault.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); + context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(newDefault); + + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo"); + foo.setMaxPoolSize(25); + foo.setPoolSize(1); + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + + ExecutorService executor = context.getExecutorServiceStrategy().newThreadPool(this, "MyPool", "foo"); + + ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); + assertEquals(25, tp.getMaximumPoolSize()); + // should inherit the default values + assertEquals(1, tp.getCorePoolSize()); + assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS)); + assertIsInstanceOf(ThreadPoolExecutor.AbortPolicy.class, tp.getRejectedExecutionHandler()); + } + + public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception { + ThreadPoolProfileSupport newDefault = new ThreadPoolProfileSupport("newDefault"); + // just change the max pool as the default profile should then inherit the old default profile + newDefault.setMaxPoolSize(50); + context.getExecutorServiceStrategy().setDefaultThreadPoolProfile(newDefault); + + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo"); + foo.setPoolSize(1); + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + assertSame(foo, context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + + ExecutorService executor = context.getExecutorServiceStrategy().newThreadPool(this, "MyPool", "foo"); + + ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); + assertEquals(1, tp.getCorePoolSize()); + // should inherit the default values + assertEquals(50, tp.getMaximumPoolSize()); + assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS)); + assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler()); + } + + public void testNewThreadPoolProfile() throws Exception { + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("foo")); + + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("foo"); + foo.setKeepAliveTime(20L); + foo.setMaxPoolSize(40); + foo.setPoolSize(5); + foo.setMaxQueueSize(2000); + + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + + ExecutorService pool = context.getExecutorServiceStrategy().newThreadPool(this, "Cool", "foo"); + assertNotNull(pool); + + ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool); + assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS)); + assertEquals(40, tp.getMaximumPoolSize()); + assertEquals(5, tp.getCorePoolSize()); + assertFalse(tp.isShutdown()); + + context.stop(); + + assertTrue(tp.isShutdown()); + } + + public void testLookupThreadPoolProfile() throws Exception { + ExecutorService pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile"); + // does not exists yet + assertNull(pool); + + assertNull(context.getExecutorServiceStrategy().getThreadPoolProfile("fooProfile")); + + ThreadPoolProfileSupport foo = new ThreadPoolProfileSupport("fooProfile"); + foo.setKeepAliveTime(20L); + foo.setMaxPoolSize(40); + foo.setPoolSize(5); + foo.setMaxQueueSize(2000); + + context.getExecutorServiceStrategy().registerThreadPoolProfile(foo); + + pool = context.getExecutorServiceStrategy().lookup(this, "Cool", "fooProfile"); + assertNotNull(pool); + + ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool); + assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS)); + assertEquals(40, tp.getMaximumPoolSize()); + assertEquals(5, tp.getCorePoolSize()); + assertFalse(tp.isShutdown()); + + context.stop(); + + assertTrue(tp.isShutdown()); + } + +}