camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: svn commit: r1442002 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/
Date Mon, 04 Feb 2013 08:18:07 GMT
Hi

I think you should make it possible to configure this once on
JmsConfiguration / JmsComponent, Then you dont need to configure this
per endpoint.
Though if you configure per endpoint then it overrides the
configuration inherited from component.

Also the option could be exposed as a read-only JMX annotation by
adding a @ManagedAttribute to the getter in the endpoint.

On Mon, Feb 4, 2013 at 2:32 AM,  <raulk@apache.org> wrote:
> Author: raulk
> Date: Mon Feb  4 01:32:58 2013
> New Revision: 1442002
>
> URL: http://svn.apache.org/viewvc?rev=1442002&view=rev
> Log:
> CAMEL-5974 Allow specifying the default type of TaskExecutor used by the DMLC (camel-jms)
>
> Added:
>     camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
>     camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> Modified:
>     camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
>     camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
>     camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
>     camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
>
> Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
(original)
> +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
Mon Feb  4 01:32:58 2013
> @@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent.
>  import org.springframework.core.task.SimpleAsyncTaskExecutor;
>  import org.springframework.core.task.TaskExecutor;
>  import org.springframework.jms.listener.DefaultMessageListenerContainer;
> +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
>
>  /**
>   * The default {@link DefaultMessageListenerContainer container} which listen for messages
> @@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo
>
>      /**
>       * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
> -     * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
> -     * with the specified bean name and using Camel's {@link org.apache.camel.spi.ExecutorServiceManager}
> +     * <p />
> +     * The type of {@link TaskExecutor} will depend on the value of
> +     * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more details, refer
to the Javadoc of
> +     * {@link DefaultTaskExecutorType}.
> +     * <p />
> +     * In all cases, it uses the specified bean name and Camel's {@link org.apache.camel.spi.ExecutorServiceManager}
>       * to resolve the thread name.
> -     * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
> +     * @see JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType)
> +     * @see ThreadPoolTaskExecutor#setBeanName(String)
>       */
>      @Override
>      protected TaskExecutor createDefaultTaskExecutor() {
>          String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
> -        String beanName = getBeanName();
> +        String beanName = getBeanName() == null ? endpoint.getThreadName() : getBeanName();
>
> -        SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
> -        answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
> -        return answer;
> +        if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool)
{
> +            ThreadPoolTaskExecutor answer = new ThreadPoolTaskExecutor();
> +            answer.setBeanName(beanName);
> +            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
> +            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
> +            // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately.
> +            // We set no upper-bound on the thread pool (no maxPoolSize) as it's already
implicitly constrained by
> +            // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up
to a level of concurrency as
> +            // defined by maxConcurrentConsumers).
> +            answer.setQueueCapacity(0);
> +            answer.initialize();
> +            return answer;
> +        } else {
> +            SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
> +            answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
> +            return answer;
> +        }
>      }
> -
> +
>  }
>
> Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java?rev=1442002&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
(added)
> +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
Mon Feb  4 01:32:58 2013
> @@ -0,0 +1,49 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jms;
> +
> +import java.util.concurrent.ThreadPoolExecutor;
> +
> +import org.springframework.core.task.SimpleAsyncTaskExecutor;
> +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
> +
> +/**
> + * Hints what type of default task executor our {@link DefaultJmsMessageListenerContainer}
should use.
> + * @since 2.10.3
> + */
> +public enum DefaultTaskExecutorType {
> +
> +    /**
> +     * Use a {@link ThreadPoolTaskExecutor} as the underlying task executor for consuming
messages.
> +     * It will be configured with these attributes:
> +     * <p />
> +     * <li>
> +     * <ul>{@code corePoolSize} = concurrentConsumers</ul>
> +     * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy for growing
the thread pool,
> +     * see Javadoc of {@link ThreadPoolExecutor}.</ul>
> +     * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as concurrency
should be limited by
> +     * the endpoint's maxConcurrentConsumers, not by the thread pool).</ul>
> +     * </li>
> +     */
> +    ThreadPool,
> +
> +    /**
> +     * Use a {@link SimpleAsyncTaskExecutor} as the underlying task executor for consuming
messages.
> +     * (Legacy mode - default behaviour before version 2.10.3).
> +     */
> +    SimpleAsync
> +}
>
> Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
(original)
> +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
Mon Feb  4 01:32:58 2013
> @@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
>
>  import java.util.Map;
>  import java.util.concurrent.ExecutorService;
> +
>  import javax.jms.ConnectionFactory;
>  import javax.jms.ExceptionListener;
>  import javax.jms.Session;
> @@ -183,7 +184,7 @@ public class JmsComponent extends Defaul
>      public void setCacheLevelName(String cacheName) {
>          getConfiguration().setCacheLevelName(cacheName);
>      }
> -
> +
>      public void setReplyToCacheLevelName(String cacheName) {
>          getConfiguration().setReplyToCacheLevelName(cacheName);
>      }
> @@ -235,7 +236,7 @@ public class JmsComponent extends Defaul
>      public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
>          getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
>      }
> -
> +
>      public void setIdleConsumerLimit(int idleConsumerLimit) {
>          getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
>      }
>
> Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
> +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Mon Feb  4 01:32:58 2013
> @@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan
>  import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
>
>  /**
> - * @version
> + * @version
>   */
>  public class JmsConfiguration implements Cloneable {
>
> @@ -137,6 +137,7 @@ public class JmsConfiguration implements
>      private boolean allowNullBody = true;
>      private MessageListenerContainerFactory messageListenerContainerFactory;
>      private boolean includeSentJMSMessageID;
> +    private DefaultTaskExecutorType defaultTaskExecutorType;
>
>      public JmsConfiguration() {
>      }
> @@ -386,7 +387,7 @@ public class JmsConfiguration implements
>          case Default:
>              return new DefaultJmsMessageListenerContainer(endpoint);
>          case Custom:
> -            return getCustomMessageListenerContainer(endpoint);
> +            return getCustomMessageListenerContainer(endpoint);
>          default:
>              throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
>          }
> @@ -1313,4 +1314,16 @@ public class JmsConfiguration implements
>      public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) {
>          this.includeSentJMSMessageID = includeSentJMSMessageID;
>      }
> +
> +    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
> +        return defaultTaskExecutorType;
> +    }
> +
> +    /**
> +     * Indicates what type of {@link TaskExecutor} to use by default for JMS consumers.
> +     * Refer to the documentation of {@link DefaultTaskExecutorType} for available options.
> +     */
> +    public void setDefaultTaskExecutorType(DefaultTaskExecutorType defaultTaskExecutorType)
{
> +        this.defaultTaskExecutorType = defaultTaskExecutorType;
> +    }
>  }
>
> Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
(original)
> +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Mon Feb  4 01:32:58 2013
> @@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan
>  /**
>   * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
>   *
> - * @version
> + * @version
>   */
>  @ManagedResource(description = "Managed JMS Endpoint")
>  public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware,
MultipleConsumersSupport, Service {
> @@ -176,24 +176,32 @@ public class JmsEndpoint extends Default
>          listenerContainer.setPubSubDomain(pubSubDomain);
>
>          // include destination name as part of thread and transaction name
> -        String consumerName = "JmsConsumer[" + getEndpointConfiguredDestinationName()
+ "]";
> +        String consumerName = getThreadName();
>
>          if (configuration.getTaskExecutor() != null) {
>              if (log.isDebugEnabled()) {
>                  log.debug("Using custom TaskExecutor: {} on listener container: {}",
configuration.getTaskExecutor(), listenerContainer);
>              }
>              setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor());
> -        } else {
> +        } else if ((listenerContainer instanceof DefaultJmsMessageListenerContainer
&& configuration.getDefaultTaskExecutorType() == null)
> +                || !(listenerContainer instanceof DefaultJmsMessageListenerContainer))
{
> +            // preserve backwards compatibility if an explicit Default TaskExecutor
Type was not set;
> +            // otherwise, defer the creation of the TaskExecutor
>              // use a cached pool as DefaultMessageListenerContainer will throttle pool
sizing
>              ExecutorService executor = getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer,
consumerName);
>              setContainerTaskExecutor(listenerContainer, executor);
> +        } else {
> +            // do nothing, as we're working with a DefaultJmsMessageListenerContainer
with an explicit DefaultTaskExecutorType,
> +            // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will
handle the creation
> +            log.debug("Deferring creation of TaskExecutor for listener container: {}
as per policy: {}",
> +                    listenerContainer, configuration.getDefaultTaskExecutorType());
>          }
> -
> +
>          // set a default transaction name if none provided
>          if (configuration.getTransactionName() == null) {
>              if (listenerContainer instanceof DefaultMessageListenerContainer) {
>                  ((DefaultMessageListenerContainer) listenerContainer).setTransactionName(consumerName);
> -            }
> +            }
>          }
>      }
>
> @@ -271,6 +279,10 @@ public class JmsEndpoint extends Default
>          return true;
>      }
>
> +    public String getThreadName() {
> +        return "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
> +    }
> +
>      // Properties
>      // -------------------------------------------------------------------------
>
> @@ -448,7 +460,7 @@ public class JmsEndpoint extends Default
>      public String getCacheLevelName() {
>          return getConfiguration().getCacheLevelName();
>      }
> -
> +
>      @ManagedAttribute
>      public String getReplyToCacheLevelName() {
>          return getConfiguration().getReplyToCacheLevelName();
> @@ -489,7 +501,7 @@ public class JmsEndpoint extends Default
>      public LoggingLevel getErrorHandlerLoggingLevel() {
>          return getConfiguration().getErrorHandlerLoggingLevel();
>      }
> -
> +
>      @ManagedAttribute
>      public boolean isErrorHandlerLogStackTrace() {
>          return getConfiguration().isErrorHandlerLogStackTrace();
> @@ -509,7 +521,7 @@ public class JmsEndpoint extends Default
>      public int getIdleConsumerLimit() {
>          return getConfiguration().getIdleConsumerLimit();
>      }
> -
> +
>      public JmsOperations getJmsOperations() {
>          return getConfiguration().getJmsOperations();
>      }
> @@ -724,7 +736,7 @@ public class JmsEndpoint extends Default
>      public void setCacheLevelName(String cacheName) {
>          getConfiguration().setCacheLevelName(cacheName);
>      }
> -
> +
>      @ManagedAttribute
>      public void setReplyToCacheLevelName(String cacheName) {
>          getConfiguration().setReplyToCacheLevelName(cacheName);
> @@ -795,7 +807,7 @@ public class JmsEndpoint extends Default
>      public void setIdleConsumerLimit(int idleConsumerLimit) {
>          getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
>      }
> -
> +
>      public void setJmsOperations(JmsOperations jmsOperations) {
>          getConfiguration().setJmsOperations(jmsOperations);
>      }
> @@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default
>      public void setAllowNullBody(boolean allowNullBody) {
>          configuration.setAllowNullBody(allowNullBody);
>      }
> -
> +
>      @ManagedAttribute
>      public boolean isIncludeSentJMSMessageID() {
>          return configuration.isIncludeSentJMSMessageID();
> @@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default
>          configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID);
>      }
>
> +    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
> +        return configuration.getDefaultTaskExecutorType();
> +    }
> +
> +    public void setDefaultTaskExecutorType(DefaultTaskExecutorType type) {
> +        configuration.setDefaultTaskExecutorType(type);
> +    }
> +
>      public MessageListenerContainerFactory getMessageListenerContainerFactory() {
>          return configuration.getMessageListenerContainerFactory();
>      }
>
> Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java?rev=1442002&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
(added)
> +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
Mon Feb  4 01:32:58 2013
> @@ -0,0 +1,150 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jms;
> +
> +import java.lang.reflect.InvocationTargetException;
> +import java.lang.reflect.Method;
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +
> +import javax.jms.ConnectionFactory;
> +
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.test.junit4.CamelTestSupport;
> +import org.apache.camel.util.concurrent.ThreadHelper;
> +import org.junit.Test;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> +
> +/**
> + *
> + */
> +public class JmsDefaultTaskExecutorTypeTest extends CamelTestSupport {
> +
> +    private static final Logger LOG = LoggerFactory.getLogger(JmsDefaultTaskExecutorTypeTest.class);
> +
> +    @Test
> +    public void testThreadPoolTaskExecutor() throws Exception {
> +        context.startRoute("threadPool");
> +        Long beforeThreadCount = currentThreadCount();
> +        getMockEndpoint("mock:result.threadPool").expectedMessageCount(1000);
> +        doSendMessages("foo.threadPool", 500, 5, DefaultTaskExecutorType.ThreadPool);
> +        Thread.sleep(100);
> +        doSendMessages("foo.threadPool", 500, 5, DefaultTaskExecutorType.ThreadPool);
> +        assertMockEndpointsSatisfied();
> +        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
> +        LOG.info("Number of threads created, testThreadPoolTaskExecutor: " + numberThreadsCreated);
> +        assertTrue("Number of threads created should be equal or lower than "
> +                + "100 with ThreadPoolTaskExecutor", numberThreadsCreated <= 100);
> +    }
> +
> +    @Test
> +    public void testSimpleAsyncTaskExecutor() throws Exception {
> +        context.startRoute("simpleAsync");
> +        Long beforeThreadCount = currentThreadCount();
> +        getMockEndpoint("mock:result.simpleAsync").expectedMessageCount(1000);
> +        doSendMessages("foo.simpleAsync", 500, 5, DefaultTaskExecutorType.SimpleAsync);
> +        Thread.sleep(100);
> +        doSendMessages("foo.simpleAsync", 500, 5, DefaultTaskExecutorType.SimpleAsync);
> +        assertMockEndpointsSatisfied();
> +        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
> +        LOG.info("Number of threads created, testSimpleAsyncTaskExecutor: " + numberThreadsCreated);
> +        assertTrue("Number of threads created should be equal or higher than "
> +                + "800 with SimpleAsyncTaskExecutor", numberThreadsCreated >= 800);
> +    }
> +
> +    @Test
> +    public void testDefaultTaskExecutor() throws Exception {
> +        context.startRoute("default");
> +        Long beforeThreadCount = currentThreadCount();
> +        getMockEndpoint("mock:result.default").expectedMessageCount(1000);
> +        doSendMessages("foo.default", 500, 5, null);
> +        Thread.sleep(100);
> +        doSendMessages("foo.default", 500, 5, null);
> +        assertMockEndpointsSatisfied();
> +        Long numberThreadsCreated = currentThreadCount() - beforeThreadCount;
> +        LOG.info("Number of threads created, testDefaultTaskExecutor: " + numberThreadsCreated);
> +        assertTrue("Number of threads created should be equal or higher than "
> +                + "800 with default behaviour", numberThreadsCreated >= 800);
> +    }
> +
> +    private Long currentThreadCount() throws NoSuchMethodException,
> +            IllegalAccessException, InvocationTargetException {
> +        Method m = ThreadHelper.class.getDeclaredMethod("nextThreadCounter", (Class<?>[])
null);
> +        m.setAccessible(true);
> +        Long nextThreadCount = (Long) m.invoke(null);
> +        return nextThreadCount;
> +    }
> +
> +    protected CamelContext createCamelContext() throws Exception {
> +        CamelContext camelContext = super.createCamelContext();
> +        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
> +        JmsComponent jmsComponent = jmsComponentAutoAcknowledge(connectionFactory);
> +        jmsComponent.getConfiguration().setMaxMessagesPerTask(1);
> +        jmsComponent.getConfiguration().setIdleTaskExecutionLimit(1);
> +        jmsComponent.getConfiguration().setConcurrentConsumers(3);
> +        jmsComponent.getConfiguration().setMaxConcurrentConsumers(10);
> +        jmsComponent.getConfiguration().setReceiveTimeout(50);
> +        camelContext.addComponent("activemq", jmsComponent);
> +        return camelContext;
> +    }
> +
> +    private void doSendMessages(final String queueName, int messages, int poolSize,
> +            final DefaultTaskExecutorType defaultTaskExecutorType) throws Exception
{
> +        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
> +        final CountDownLatch latch = new CountDownLatch(messages);
> +        for (int i = 0; i < messages; i++) {
> +            final int index = i;
> +            executor.submit(new Callable<Object>() {
> +                public Object call() throws Exception {
> +                    String options = defaultTaskExecutorType == null ? "" : "?defaultTaskExecutorType="
> +                            + defaultTaskExecutorType.toString();
> +                    template.requestBody("activemq:queue:" + queueName + options, "Message
" + index);
> +                    latch.countDown();
> +                    return null;
> +                }
> +            });
> +        }
> +        latch.await();
> +        executor.shutdown();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("activemq:queue:foo.simpleAsync?defaultTaskExecutorType=SimpleAsync").routeId("simpleAsync").noAutoStartup()
> +                    .to("mock:result.simpleAsync")
> +                    .setBody(constant("Reply"));
> +
> +                from("activemq:queue:foo.threadPool?defaultTaskExecutorType=ThreadPool").routeId("threadPool").noAutoStartup()
> +                    .to("mock:result.threadPool")
> +                    .setBody(constant("Reply"));
> +
> +                from("activemq:queue:foo.default").routeId("default").noAutoStartup()
> +                    .to("mock:result.default")
> +                    .setBody(constant("Reply"));
> +            }
> +        };
> +    }
> +}
>
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Mime
View raw message