camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raul Kripalani <r...@evosent.com>
Subject Re: svn commit: r1442002 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/
Date Tue, 05 Feb 2013 23:40:59 GMT
Hi Claus,

Setting up this attribute at the JmsConfiguration level was already
possible.
But I also added a setter to JmsComponent for further convenience and made
it a read-only @ManagedAttribute.

Regards,

*Raúl Kripalani*
Apache Camel Committer
Enterprise Architect, Program Manager, Open Source Integration specialist
http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>

On Mon, Feb 4, 2013 at 8:18 AM, Claus Ibsen <claus.ibsen@gmail.com> wrote:

> Hi
>
> I think you should make it possible to configure this once on
> JmsConfiguration / JmsComponent, Then you dont need to configure this
> per endpoint.
> Though if you configure per endpoint then it overrides the
> configuration inherited from component.
>
> Also the option could be exposed as a read-only JMX annotation by
> adding a @ManagedAttribute to the getter in the endpoint.
>
> On Mon, Feb 4, 2013 at 2:32 AM,  <raulk@apache.org> wrote:
> > Author: raulk
> > Date: Mon Feb  4 01:32:58 2013
> > New Revision: 1442002
> >
> > URL: http://svn.apache.org/viewvc?rev=1442002&view=rev
> > Log:
> > CAMEL-5974 Allow specifying the default type of TaskExecutor used by the
> DMLC (camel-jms)
> >
> > Added:
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
> >
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> > Modified:
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
> >
> > Modified:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
> (original)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
> Mon Feb  4 01:32:58 2013
> > @@ -20,6 +20,7 @@ import org.apache.camel.util.concurrent.
> >  import org.springframework.core.task.SimpleAsyncTaskExecutor;
> >  import org.springframework.core.task.TaskExecutor;
> >  import org.springframework.jms.listener.DefaultMessageListenerContainer;
> > +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
> >
> >  /**
> >   * The default {@link DefaultMessageListenerContainer container} which
> listen for messages
> > @@ -46,19 +47,38 @@ public class DefaultJmsMessageListenerCo
> >
> >      /**
> >       * Create a default TaskExecutor. Called if no explicit
> TaskExecutor has been specified.
> > -     * <p>The default implementation builds a {@link
> org.springframework.core.task.SimpleAsyncTaskExecutor}
> > -     * with the specified bean name and using Camel's {@link
> org.apache.camel.spi.ExecutorServiceManager}
> > +     * <p />
> > +     * The type of {@link TaskExecutor} will depend on the value of
> > +     * {@link JmsConfiguration#getDefaultTaskExecutorType()}. For more
> details, refer to the Javadoc of
> > +     * {@link DefaultTaskExecutorType}.
> > +     * <p />
> > +     * In all cases, it uses the specified bean name and Camel's {@link
> org.apache.camel.spi.ExecutorServiceManager}
> >       * to resolve the thread name.
> > -     * @see
> org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
> > +     * @see
> JmsConfiguration#setDefaultTaskExecutorType(DefaultTaskExecutorType)
> > +     * @see ThreadPoolTaskExecutor#setBeanName(String)
> >       */
> >      @Override
> >      protected TaskExecutor createDefaultTaskExecutor() {
> >          String pattern =
> endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
> > -        String beanName = getBeanName();
> > +        String beanName = getBeanName() == null ?
> endpoint.getThreadName() : getBeanName();
> >
> > -        SimpleAsyncTaskExecutor answer = new
> SimpleAsyncTaskExecutor(beanName);
> > -        answer.setThreadFactory(new CamelThreadFactory(pattern,
> beanName, true));
> > -        return answer;
> > +        if (endpoint.getDefaultTaskExecutorType() ==
> DefaultTaskExecutorType.ThreadPool) {
> > +            ThreadPoolTaskExecutor answer = new
> ThreadPoolTaskExecutor();
> > +            answer.setBeanName(beanName);
> > +            answer.setThreadFactory(new CamelThreadFactory(pattern,
> beanName, true));
> > +            answer.setCorePoolSize(endpoint.getConcurrentConsumers());
> > +            // Direct hand-off mode. Do not queue up tasks: assign it
> to a thread immediately.
> > +            // We set no upper-bound on the thread pool (no
> maxPoolSize) as it's already implicitly constrained by
> > +            // maxConcurrentConsumers on the DMLC itself (i.e. DMLC
> will only grow up to a level of concurrency as
> > +            // defined by maxConcurrentConsumers).
> > +            answer.setQueueCapacity(0);
> > +            answer.initialize();
> > +            return answer;
> > +        } else {
> > +            SimpleAsyncTaskExecutor answer = new
> SimpleAsyncTaskExecutor(beanName);
> > +            answer.setThreadFactory(new CamelThreadFactory(pattern,
> beanName, true));
> > +            return answer;
> > +        }
> >      }
> > -
> > +
> >  }
> >
> > Added:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java?rev=1442002&view=auto
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
> (added)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultTaskExecutorType.java
> Mon Feb  4 01:32:58 2013
> > @@ -0,0 +1,49 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *      http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.camel.component.jms;
> > +
> > +import java.util.concurrent.ThreadPoolExecutor;
> > +
> > +import org.springframework.core.task.SimpleAsyncTaskExecutor;
> > +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
> > +
> > +/**
> > + * Hints what type of default task executor our {@link
> DefaultJmsMessageListenerContainer} should use.
> > + * @since 2.10.3
> > + */
> > +public enum DefaultTaskExecutorType {
> > +
> > +    /**
> > +     * Use a {@link ThreadPoolTaskExecutor} as the underlying task
> executor for consuming messages.
> > +     * It will be configured with these attributes:
> > +     * <p />
> > +     * <li>
> > +     * <ul>{@code corePoolSize} = concurrentConsumers</ul>
> > +     * <ul>{@code queueSize} = 0 (to use the 'direct handoff' strategy
> for growing the thread pool,
> > +     * see Javadoc of {@link ThreadPoolExecutor}.</ul>
> > +     * <ul>{@code maxPoolSize}, default value, i.e. no upper bound (as
> concurrency should be limited by
> > +     * the endpoint's maxConcurrentConsumers, not by the thread
> pool).</ul>
> > +     * </li>
> > +     */
> > +    ThreadPool,
> > +
> > +    /**
> > +     * Use a {@link SimpleAsyncTaskExecutor} as the underlying task
> executor for consuming messages.
> > +     * (Legacy mode - default behaviour before version 2.10.3).
> > +     */
> > +    SimpleAsync
> > +}
> >
> > Modified:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
> (original)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
> Mon Feb  4 01:32:58 2013
> > @@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
> >
> >  import java.util.Map;
> >  import java.util.concurrent.ExecutorService;
> > +
> >  import javax.jms.ConnectionFactory;
> >  import javax.jms.ExceptionListener;
> >  import javax.jms.Session;
> > @@ -183,7 +184,7 @@ public class JmsComponent extends Defaul
> >      public void setCacheLevelName(String cacheName) {
> >          getConfiguration().setCacheLevelName(cacheName);
> >      }
> > -
> > +
> >      public void setReplyToCacheLevelName(String cacheName) {
> >          getConfiguration().setReplyToCacheLevelName(cacheName);
> >      }
> > @@ -235,7 +236,7 @@ public class JmsComponent extends Defaul
> >      public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
> >
>  getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit);
> >      }
> > -
> > +
> >      public void setIdleConsumerLimit(int idleConsumerLimit) {
> >          getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
> >      }
> >
> > Modified:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
> (original)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
> Mon Feb  4 01:32:58 2013
> > @@ -49,7 +49,7 @@ import org.springframework.util.ErrorHan
> >  import static
> org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
> >
> >  /**
> > - * @version
> > + * @version
> >   */
> >  public class JmsConfiguration implements Cloneable {
> >
> > @@ -137,6 +137,7 @@ public class JmsConfiguration implements
> >      private boolean allowNullBody = true;
> >      private MessageListenerContainerFactory
> messageListenerContainerFactory;
> >      private boolean includeSentJMSMessageID;
> > +    private DefaultTaskExecutorType defaultTaskExecutorType;
> >
> >      public JmsConfiguration() {
> >      }
> > @@ -386,7 +387,7 @@ public class JmsConfiguration implements
> >          case Default:
> >              return new DefaultJmsMessageListenerContainer(endpoint);
> >          case Custom:
> > -            return getCustomMessageListenerContainer(endpoint);
> > +            return getCustomMessageListenerContainer(endpoint);
> >          default:
> >              throw new IllegalArgumentException("Unknown consumer type:
> " + consumerType);
> >          }
> > @@ -1313,4 +1314,16 @@ public class JmsConfiguration implements
> >      public void setIncludeSentJMSMessageID(boolean
> includeSentJMSMessageID) {
> >          this.includeSentJMSMessageID = includeSentJMSMessageID;
> >      }
> > +
> > +    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
> > +        return defaultTaskExecutorType;
> > +    }
> > +
> > +    /**
> > +     * Indicates what type of {@link TaskExecutor} to use by default
> for JMS consumers.
> > +     * Refer to the documentation of {@link DefaultTaskExecutorType}
> for available options.
> > +     */
> > +    public void setDefaultTaskExecutorType(DefaultTaskExecutorType
> defaultTaskExecutorType) {
> > +        this.defaultTaskExecutorType = defaultTaskExecutorType;
> > +    }
> >  }
> >
> > Modified:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1442002&r1=1442001&r2=1442002&view=diff
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
> (original)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
> Mon Feb  4 01:32:58 2013
> > @@ -62,7 +62,7 @@ import org.springframework.util.ErrorHan
> >  /**
> >   * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
> >   *
> > - * @version
> > + * @version
> >   */
> >  @ManagedResource(description = "Managed JMS Endpoint")
> >  public class JmsEndpoint extends DefaultEndpoint implements
> HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
> > @@ -176,24 +176,32 @@ public class JmsEndpoint extends Default
> >          listenerContainer.setPubSubDomain(pubSubDomain);
> >
> >          // include destination name as part of thread and transaction
> name
> > -        String consumerName = "JmsConsumer[" +
> getEndpointConfiguredDestinationName() + "]";
> > +        String consumerName = getThreadName();
> >
> >          if (configuration.getTaskExecutor() != null) {
> >              if (log.isDebugEnabled()) {
> >                  log.debug("Using custom TaskExecutor: {} on listener
> container: {}", configuration.getTaskExecutor(), listenerContainer);
> >              }
> >              setContainerTaskExecutor(listenerContainer,
> configuration.getTaskExecutor());
> > -        } else {
> > +        } else if ((listenerContainer instanceof
> DefaultJmsMessageListenerContainer &&
> configuration.getDefaultTaskExecutorType() == null)
> > +                || !(listenerContainer instanceof
> DefaultJmsMessageListenerContainer)) {
> > +            // preserve backwards compatibility if an explicit Default
> TaskExecutor Type was not set;
> > +            // otherwise, defer the creation of the TaskExecutor
> >              // use a cached pool as DefaultMessageListenerContainer
> will throttle pool sizing
> >              ExecutorService executor =
> getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer,
> consumerName);
> >              setContainerTaskExecutor(listenerContainer, executor);
> > +        } else {
> > +            // do nothing, as we're working with a
> DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType,
> > +            // so
> DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle
> the creation
> > +            log.debug("Deferring creation of TaskExecutor for listener
> container: {} as per policy: {}",
> > +                    listenerContainer,
> configuration.getDefaultTaskExecutorType());
> >          }
> > -
> > +
> >          // set a default transaction name if none provided
> >          if (configuration.getTransactionName() == null) {
> >              if (listenerContainer instanceof
> DefaultMessageListenerContainer) {
> >                  ((DefaultMessageListenerContainer)
> listenerContainer).setTransactionName(consumerName);
> > -            }
> > +            }
> >          }
> >      }
> >
> > @@ -271,6 +279,10 @@ public class JmsEndpoint extends Default
> >          return true;
> >      }
> >
> > +    public String getThreadName() {
> > +        return "JmsConsumer[" + getEndpointConfiguredDestinationName()
> + "]";
> > +    }
> > +
> >      // Properties
> >      //
> -------------------------------------------------------------------------
> >
> > @@ -448,7 +460,7 @@ public class JmsEndpoint extends Default
> >      public String getCacheLevelName() {
> >          return getConfiguration().getCacheLevelName();
> >      }
> > -
> > +
> >      @ManagedAttribute
> >      public String getReplyToCacheLevelName() {
> >          return getConfiguration().getReplyToCacheLevelName();
> > @@ -489,7 +501,7 @@ public class JmsEndpoint extends Default
> >      public LoggingLevel getErrorHandlerLoggingLevel() {
> >          return getConfiguration().getErrorHandlerLoggingLevel();
> >      }
> > -
> > +
> >      @ManagedAttribute
> >      public boolean isErrorHandlerLogStackTrace() {
> >          return getConfiguration().isErrorHandlerLogStackTrace();
> > @@ -509,7 +521,7 @@ public class JmsEndpoint extends Default
> >      public int getIdleConsumerLimit() {
> >          return getConfiguration().getIdleConsumerLimit();
> >      }
> > -
> > +
> >      public JmsOperations getJmsOperations() {
> >          return getConfiguration().getJmsOperations();
> >      }
> > @@ -724,7 +736,7 @@ public class JmsEndpoint extends Default
> >      public void setCacheLevelName(String cacheName) {
> >          getConfiguration().setCacheLevelName(cacheName);
> >      }
> > -
> > +
> >      @ManagedAttribute
> >      public void setReplyToCacheLevelName(String cacheName) {
> >          getConfiguration().setReplyToCacheLevelName(cacheName);
> > @@ -795,7 +807,7 @@ public class JmsEndpoint extends Default
> >      public void setIdleConsumerLimit(int idleConsumerLimit) {
> >          getConfiguration().setIdleConsumerLimit(idleConsumerLimit);
> >      }
> > -
> > +
> >      public void setJmsOperations(JmsOperations jmsOperations) {
> >          getConfiguration().setJmsOperations(jmsOperations);
> >      }
> > @@ -1040,7 +1052,7 @@ public class JmsEndpoint extends Default
> >      public void setAllowNullBody(boolean allowNullBody) {
> >          configuration.setAllowNullBody(allowNullBody);
> >      }
> > -
> > +
> >      @ManagedAttribute
> >      public boolean isIncludeSentJMSMessageID() {
> >          return configuration.isIncludeSentJMSMessageID();
> > @@ -1051,6 +1063,14 @@ public class JmsEndpoint extends Default
> >
>  configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID);
> >      }
> >
> > +    public DefaultTaskExecutorType getDefaultTaskExecutorType() {
> > +        return configuration.getDefaultTaskExecutorType();
> > +    }
> > +
> > +    public void setDefaultTaskExecutorType(DefaultTaskExecutorType
> type) {
> > +        configuration.setDefaultTaskExecutorType(type);
> > +    }
> > +
> >      public MessageListenerContainerFactory
> getMessageListenerContainerFactory() {
> >          return configuration.getMessageListenerContainerFactory();
> >      }
> >
> > Added:
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java?rev=1442002&view=auto
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> (added)
> > +++
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDefaultTaskExecutorTypeTest.java
> Mon Feb  4 01:32:58 2013
> > @@ -0,0 +1,150 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *      http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.camel.component.jms;
> > +
> > +import java.lang.reflect.InvocationTargetException;
> > +import java.lang.reflect.Method;
> > +import java.util.concurrent.Callable;
> > +import java.util.concurrent.CountDownLatch;
> > +import java.util.concurrent.ExecutorService;
> > +import java.util.concurrent.Executors;
> > +
> > +import javax.jms.ConnectionFactory;
> > +
> > +import org.apache.camel.CamelContext;
> > +import org.apache.camel.builder.RouteBuilder;
> > +import org.apache.camel.test.junit4.CamelTestSupport;
> > +import org.apache.camel.util.concurrent.ThreadHelper;
> > +import org.junit.Test;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +import static
> org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> > +
> > +/**
> > + *
> > + */
> > +public class JmsDefaultTaskExecutorTypeTest extends CamelTestSupport {
> > +
> > +    private static final Logger LOG =
> LoggerFactory.getLogger(JmsDefaultTaskExecutorTypeTest.class);
> > +
> > +    @Test
> > +    public void testThreadPoolTaskExecutor() throws Exception {
> > +        context.startRoute("threadPool");
> > +        Long beforeThreadCount = currentThreadCount();
> > +
>  getMockEndpoint("mock:result.threadPool").expectedMessageCount(1000);
> > +        doSendMessages("foo.threadPool", 500, 5,
> DefaultTaskExecutorType.ThreadPool);
> > +        Thread.sleep(100);
> > +        doSendMessages("foo.threadPool", 500, 5,
> DefaultTaskExecutorType.ThreadPool);
> > +        assertMockEndpointsSatisfied();
> > +        Long numberThreadsCreated = currentThreadCount() -
> beforeThreadCount;
> > +        LOG.info("Number of threads created,
> testThreadPoolTaskExecutor: " + numberThreadsCreated);
> > +        assertTrue("Number of threads created should be equal or lower
> than "
> > +                + "100 with ThreadPoolTaskExecutor",
> numberThreadsCreated <= 100);
> > +    }
> > +
> > +    @Test
> > +    public void testSimpleAsyncTaskExecutor() throws Exception {
> > +        context.startRoute("simpleAsync");
> > +        Long beforeThreadCount = currentThreadCount();
> > +
>  getMockEndpoint("mock:result.simpleAsync").expectedMessageCount(1000);
> > +        doSendMessages("foo.simpleAsync", 500, 5,
> DefaultTaskExecutorType.SimpleAsync);
> > +        Thread.sleep(100);
> > +        doSendMessages("foo.simpleAsync", 500, 5,
> DefaultTaskExecutorType.SimpleAsync);
> > +        assertMockEndpointsSatisfied();
> > +        Long numberThreadsCreated = currentThreadCount() -
> beforeThreadCount;
> > +        LOG.info("Number of threads created,
> testSimpleAsyncTaskExecutor: " + numberThreadsCreated);
> > +        assertTrue("Number of threads created should be equal or higher
> than "
> > +                + "800 with SimpleAsyncTaskExecutor",
> numberThreadsCreated >= 800);
> > +    }
> > +
> > +    @Test
> > +    public void testDefaultTaskExecutor() throws Exception {
> > +        context.startRoute("default");
> > +        Long beforeThreadCount = currentThreadCount();
> > +
>  getMockEndpoint("mock:result.default").expectedMessageCount(1000);
> > +        doSendMessages("foo.default", 500, 5, null);
> > +        Thread.sleep(100);
> > +        doSendMessages("foo.default", 500, 5, null);
> > +        assertMockEndpointsSatisfied();
> > +        Long numberThreadsCreated = currentThreadCount() -
> beforeThreadCount;
> > +        LOG.info("Number of threads created, testDefaultTaskExecutor: "
> + numberThreadsCreated);
> > +        assertTrue("Number of threads created should be equal or higher
> than "
> > +                + "800 with default behaviour", numberThreadsCreated >=
> 800);
> > +    }
> > +
> > +    private Long currentThreadCount() throws NoSuchMethodException,
> > +            IllegalAccessException, InvocationTargetException {
> > +        Method m =
> ThreadHelper.class.getDeclaredMethod("nextThreadCounter", (Class<?>[])
> null);
> > +        m.setAccessible(true);
> > +        Long nextThreadCount = (Long) m.invoke(null);
> > +        return nextThreadCount;
> > +    }
> > +
> > +    protected CamelContext createCamelContext() throws Exception {
> > +        CamelContext camelContext = super.createCamelContext();
> > +        ConnectionFactory connectionFactory =
> CamelJmsTestHelper.createConnectionFactory();
> > +        JmsComponent jmsComponent =
> jmsComponentAutoAcknowledge(connectionFactory);
> > +        jmsComponent.getConfiguration().setMaxMessagesPerTask(1);
> > +        jmsComponent.getConfiguration().setIdleTaskExecutionLimit(1);
> > +        jmsComponent.getConfiguration().setConcurrentConsumers(3);
> > +        jmsComponent.getConfiguration().setMaxConcurrentConsumers(10);
> > +        jmsComponent.getConfiguration().setReceiveTimeout(50);
> > +        camelContext.addComponent("activemq", jmsComponent);
> > +        return camelContext;
> > +    }
> > +
> > +    private void doSendMessages(final String queueName, int messages,
> int poolSize,
> > +            final DefaultTaskExecutorType defaultTaskExecutorType)
> throws Exception {
> > +        ExecutorService executor =
> Executors.newFixedThreadPool(poolSize);
> > +        final CountDownLatch latch = new CountDownLatch(messages);
> > +        for (int i = 0; i < messages; i++) {
> > +            final int index = i;
> > +            executor.submit(new Callable<Object>() {
> > +                public Object call() throws Exception {
> > +                    String options = defaultTaskExecutorType == null ?
> "" : "?defaultTaskExecutorType="
> > +                            + defaultTaskExecutorType.toString();
> > +                    template.requestBody("activemq:queue:" + queueName
> + options, "Message " + index);
> > +                    latch.countDown();
> > +                    return null;
> > +                }
> > +            });
> > +        }
> > +        latch.await();
> > +        executor.shutdown();
> > +    }
> > +
> > +    @Override
> > +    protected RouteBuilder createRouteBuilder() throws Exception {
> > +        return new RouteBuilder() {
> > +            @Override
> > +            public void configure() throws Exception {
> > +
>  from("activemq:queue:foo.simpleAsync?defaultTaskExecutorType=SimpleAsync").routeId("simpleAsync").noAutoStartup()
> > +                    .to("mock:result.simpleAsync")
> > +                    .setBody(constant("Reply"));
> > +
> > +
>  from("activemq:queue:foo.threadPool?defaultTaskExecutorType=ThreadPool").routeId("threadPool").noAutoStartup()
> > +                    .to("mock:result.threadPool")
> > +                    .setBody(constant("Reply"));
> > +
> > +
>  from("activemq:queue:foo.default").routeId("default").noAutoStartup()
> > +                    .to("mock:result.default")
> > +                    .setBody(constant("Reply"));
> > +            }
> > +        };
> > +    }
> > +}
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> Email: cibsen@redhat.com
> Web: http://fusesource.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message