camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ra...@apache.org
Subject svn commit: r1442002 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/
Date Mon, 04 Feb 2013 01:32:59 GMT
Author: raulk
Date: Mon Feb  4 01:32:58 2013
New Revision: 1442002

URL: http://svn.apache.org/viewvc?rev=1442002&view=rev
Log:
CAMEL-5974 Allow specifying the default type of TaskExecutor used by the DMLC (camel-jms)

Added:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442002&r1=1442001&r2=1442002&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
Mon Feb  4 01:32:58 2013
@@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent.
 import org.springframework.core.task.SimpleAsyncTaskExecutor;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 /**
  * The default {@link DefaultMessageListenerContainer container} which listen for messages
@@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo
 
     /**
      * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
-     * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
-     * with the specified bean name and using Camel's {@link org.apache.camel.spi.ExecutorServiceManager}
+     * <p />
+     * The type of {@link TaskExecutor} will depend on the value of
+     * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more details, refer to
the Javadoc of
+     * {@link DefaultTaskExecutorType}.
+     * <p />
+     * In all cases, it uses the specified bean name and Camel's {@link org.apache.camel.spi.ExecutorServiceManager}
      * to resolve the thread name.
-     * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
+     * @see JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType)
+     * @see ThreadPoolTaskExecutor#setBeanName(String)
      */
     @Override
     protected TaskExecutor createDefaultTaskExecutor() {
         String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
-        String beanName = getBeanName();
+        String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName();
 
-        SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
-        answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
-        return answer;
+        if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool)
{
+            ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
+            answer.setBeanName(beanName);
+            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
+            // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately.
+            // We set no upper-bound on the thread pool (no maxPoolSize) as it's already
implicitly constrained by
+            // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to
a level of concurrency as
+            // defined by maxConcurrentConsumers).
+            answer.setQueueCapacity(0);
+            answer.initialize();
+            return answer;
+        } else {
+            SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
+            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+            return answer;
+        }
     }
-
+    
 }

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java?rev=1442002&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
(added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
Mon Feb  4 01:32:58 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * Hints what type of default task executor our {@link DefaultJmsMessageListenerContainer}
should use.
+ * @since 2.10.3
+ */
+public enum DefaultTaskExecutorType {
+
+    /**
+     * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor for consuming
messages.
+     * It will be configured with these attributes:
+     * <p />
+     * <li>
+     * <ul>{@code corePoolSize} = concurrentConsumers</ul>
+     * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing
the thread pool,
+     * see Javadoc of {@link ThreadPoolExecutor}.</ul>
+     * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency
should be limited by
+     * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
+     * </li>
+     */
+    ThreadPool,
+
+    /**
+     * Use a {@link SimpleAsyncTaskExecutor} as the underlying task executor for consuming
messages.
+     * (Legacy mode - default behaviour before version 2.10.3).
+     */
+    SimpleAsync
+}

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442002&r1=1442001&r2=1442002&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
Mon Feb  4 01:32:58 2013
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.Session;
@@ -183,7 +184,7 @@ public class JmsComponent extends Defaul
     public void setCacheLevelName(String cacheName) {
         getConfiguration().setCacheLevelName(cacheName);
     }
-    
+
     public void setReplyToCacheLevelName(String cacheName) {
         getConfiguration().setReplyToCacheLevelName(cacheName);
     }
@@ -235,7 +236,7 @@ public class JmsComponent extends Defaul
     public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
         getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
     }
-    
+
     public void setIdleConsumerLimit(int idleConsumerLimit) {
         getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442002&r1=1442001&r2=1442002&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Mon Feb  4 01:32:58 2013
@@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan
 import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
 
 /**
- * @version 
+ * @version
  */
 public class JmsConfiguration implements Cloneable {
 
@@ -137,6 +137,7 @@ public class JmsConfiguration implements
     private boolean allowNullBody = true;
     private MessageListenerContainerFactory messageListenerContainerFactory;
     private boolean includeSentJMSMessageID;
+    private DefaultTaskExecutorType defaultTaskExecutorType;
 
     public JmsConfiguration() {
     }
@@ -386,7 +387,7 @@ public class JmsConfiguration implements
         case Default:
             return new DefaultJmsMessageListenerContainer(endpoint);
         case Custom:
-            return getCustomMessageListenerContainer(endpoint);            
+            return getCustomMessageListenerContainer(endpoint);
         default:
             throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
         }
@@ -1313,4 +1314,16 @@ public class JmsConfiguration implements
     public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) {
         this.includeSentJMSMessageID = includeSentJMSMessageID;
     }
+
+    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
+        return defaultTaskExecutorType;
+    }
+
+    /**
+     * Indicates what type of {@link TaskExecutor} to use by default for JMS consumers.
+     * Refer to the documentation of {@link DefaultTaskExecutorType} for available options.
+     */
+    public void setDefaultTaskExecutorType(DefaultTaskExecutorType defaultTaskExecutorType)
{
+        this.defaultTaskExecutorType = defaultTaskExecutorType;
+    }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442002&r1=1442001&r2=1442002&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Mon Feb  4 01:32:58 2013
@@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan
 /**
  * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
  *
- * @version 
+ * @version
  */
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport,
Service {
@@ -176,24 +176,32 @@ public class JmsEndpoint extends Default
         listenerContainer.setPubSubDomain(pubSubDomain);
 
         // include destination name as part of thread and transaction name
-        String consumerName = "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
+        String consumerName = getThreadName();
 
         if (configuration.getTaskExecutor() != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Using custom TaskExecutor: {} on listener container: {}", configuration.getTaskExecutor(),
listenerContainer);
             }
             setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor());
-        } else {
+        } else if ((listenerContainer instanceof DefaultJmsMessageListenerContainer &&
configuration.getDefaultTaskExecutorType() == null)
+                || !(listenerContainer instanceof DefaultJmsMessageListenerContainer)) {
+            // preserve backwards compatibility if an explicit Default TaskExecutor Type
was not set;
+            // otherwise, defer the creation of the TaskExecutor
             // use a cached pool as DefaultMessageListenerContainer will throttle pool sizing
             ExecutorService executor = getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer,
consumerName);
             setContainerTaskExecutor(listenerContainer, executor);
+        } else {
+            // do nothing, as we're working with a DefaultJmsMessageListenerContainer with
an explicit DefaultTaskExecutorType,
+            // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle
the creation
+            log.debug("Deferring creation of TaskExecutor for listener container: {} as per
policy: {}", 
+                    listenerContainer, configuration.getDefaultTaskExecutorType());
         }
-        
+
         // set a default transaction name if none provided
         if (configuration.getTransactionName() == null) {
             if (listenerContainer instanceof DefaultMessageListenerContainer) {
                 ((DefaultMessageListenerContainer) listenerContainer).setTransactionName(consumerName);
-            }            
+            }
         }
     }
 
@@ -271,6 +279,10 @@ public class JmsEndpoint extends Default
         return true;
     }
 
+    public String getThreadName() {
+        return "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
+    }
+    
     // Properties
     // -------------------------------------------------------------------------
 
@@ -448,7 +460,7 @@ public class JmsEndpoint extends Default
     public String getCacheLevelName() {
         return getConfiguration().getCacheLevelName();
     }
-    
+
     @ManagedAttribute
     public String getReplyToCacheLevelName() {
         return getConfiguration().getReplyToCacheLevelName();
@@ -489,7 +501,7 @@ public class JmsEndpoint extends Default
     public LoggingLevel getErrorHandlerLoggingLevel() {
         return getConfiguration().getErrorHandlerLoggingLevel();
     }
-    
+
     @ManagedAttribute
     public boolean isErrorHandlerLogStackTrace() {
         return getConfiguration().isErrorHandlerLogStackTrace();
@@ -509,7 +521,7 @@ public class JmsEndpoint extends Default
     public int getIdleConsumerLimit() {
         return getConfiguration().getIdleConsumerLimit();
     }
-    
+
     public JmsOperations getJmsOperations() {
         return getConfiguration().getJmsOperations();
     }
@@ -724,7 +736,7 @@ public class JmsEndpoint extends Default
     public void setCacheLevelName(String cacheName) {
         getConfiguration().setCacheLevelName(cacheName);
     }
-    
+
     @ManagedAttribute
     public void setReplyToCacheLevelName(String cacheName) {
         getConfiguration().setReplyToCacheLevelName(cacheName);
@@ -795,7 +807,7 @@ public class JmsEndpoint extends Default
     public void setIdleConsumerLimit(int idleConsumerLimit) {
         getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
     }
-    
+
     public void setJmsOperations(JmsOperations jmsOperations) {
         getConfiguration().setJmsOperations(jmsOperations);
     }
@@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default
     public void setAllowNullBody(boolean allowNullBody) {
         configuration.setAllowNullBody(allowNullBody);
     }
-    
+
     @ManagedAttribute
     public boolean isIncludeSentJMSMessageID() {
         return configuration.isIncludeSentJMSMessageID();
@@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default
         configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID);
     }
 
+    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
+        return configuration.getDefaultTaskExecutorType();
+    }
+
+    public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) {
+        configuration.setDefaultTaskExecutorType(type);
+    }
+
     public MessageListenerContainerFactory getMessageListenerContainerFactory() {
         return configuration.getMessageListenerContainerFactory();
     }

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java?rev=1442002&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
(added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
Mon Feb  4 01:32:58 2013
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.concurrent.ThreadHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ *
+ */
+public class JmsDefaultTaskExecutorTypeTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsDefaultTaskExecutorTypeTest.class);
+    
+    @Test
+    public void testThreadPoolTaskExecutor() throws Exception {
+        context.startRoute("threadPool");
+        Long beforeThreadCount = currentThreadCount();
+        getMockEndpoint("mock:result.threadPool").expectedMessageCount(1000);
+        doSendMessages("foo.threadPool", 500, 5, DefaultTaskExecutorType.ThreadPool);
+        Thread.sleep(100);
+        doSendMessages("foo.threadPool", 500, 5, DefaultTaskExecutorType.ThreadPool);
+        assertMockEndpointsSatisfied();
+        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
+        LOG.info("Number of threads created, testThreadPoolTaskExecutor: " + numberThreadsCreated);
+        assertTrue("Number of threads created should be equal or lower than " 
+                + "100 with ThreadPoolTaskExecutor", numberThreadsCreated <= 100);
+    }
+
+    @Test
+    public void testSimpleAsyncTaskExecutor() throws Exception {
+        context.startRoute("simpleAsync");
+        Long beforeThreadCount = currentThreadCount();
+        getMockEndpoint("mock:result.simpleAsync").expectedMessageCount(1000);
+        doSendMessages("foo.simpleAsync", 500, 5, DefaultTaskExecutorType.SimpleAsync);
+        Thread.sleep(100);
+        doSendMessages("foo.simpleAsync", 500, 5, DefaultTaskExecutorType.SimpleAsync);
+        assertMockEndpointsSatisfied();
+        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
+        LOG.info("Number of threads created, testSimpleAsyncTaskExecutor: " + numberThreadsCreated);
+        assertTrue("Number of threads created should be equal or higher than " 
+                + "800 with SimpleAsyncTaskExecutor", numberThreadsCreated >= 800);
+    }
+
+    @Test
+    public void testDefaultTaskExecutor() throws Exception {
+        context.startRoute("default");
+        Long beforeThreadCount = currentThreadCount();
+        getMockEndpoint("mock:result.default").expectedMessageCount(1000);
+        doSendMessages("foo.default", 500, 5, null);
+        Thread.sleep(100);
+        doSendMessages("foo.default", 500, 5, null);
+        assertMockEndpointsSatisfied();
+        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
+        LOG.info("Number of threads created, testDefaultTaskExecutor: " + numberThreadsCreated);
+        assertTrue("Number of threads created should be equal or higher than " 
+                + "800 with default behaviour", numberThreadsCreated >= 800);
+    }
+    
+    private Long currentThreadCount() throws NoSuchMethodException,
+            IllegalAccessException, InvocationTargetException {
+        Method m = ThreadHelper.class.getDeclaredMethod("nextThreadCounter", (Class<?>[])
null);
+        m.setAccessible(true);
+        Long nextThreadCount = (Long) m.invoke(null);
+        return nextThreadCount;
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+        JmsComponent jmsComponent = jmsComponentAutoAcknowledge(connectionFactory);
+        jmsComponent.getConfiguration().setMaxMessagesPerTask(1);
+        jmsComponent.getConfiguration().setIdleTaskExecutionLimit(1);
+        jmsComponent.getConfiguration().setConcurrentConsumers(3);
+        jmsComponent.getConfiguration().setMaxConcurrentConsumers(10);
+        jmsComponent.getConfiguration().setReceiveTimeout(50);
+        camelContext.addComponent("activemq", jmsComponent);
+        return camelContext;
+    }
+
+    private void doSendMessages(final String queueName, int messages, int poolSize,
+            final DefaultTaskExecutorType defaultTaskExecutorType) throws Exception {
+        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+        final CountDownLatch latch = new CountDownLatch(messages);
+        for (int i = 0; i < messages; i++) {
+            final int index = i;
+            executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    String options = defaultTaskExecutorType == null ? "" : "?defaultTaskExecutorType="

+                            + defaultTaskExecutorType.toString();
+                    template.requestBody("activemq:queue:" + queueName + options, "Message
" + index);
+                    latch.countDown();
+                    return null;
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo.simpleAsync?defaultTaskExecutorType=SimpleAsync").routeId("simpleAsync").noAutoStartup()
+                    .to("mock:result.simpleAsync")
+                    .setBody(constant("Reply"));
+
+                from("activemq:queue:foo.threadPool?defaultTaskExecutorType=ThreadPool").routeId("threadPool").noAutoStartup()
+                    .to("mock:result.threadPool")
+                    .setBody(constant("Reply"));
+
+                from("activemq:queue:foo.default").routeId("default").noAutoStartup()
+                    .to("mock:result.default")
+                    .setBody(constant("Reply"));
+            }
+        };
+    }
+}



Mime
View raw message