camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1067658 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ main/java/org/apache/camel/component/jms/reply/ test/java/org/apache/camel/component/jms/
Date Sun, 06 Feb 2011 13:24:44 GMT
Author: davsclaus
Date: Sun Feb  6 13:24:44 2011
New Revision: 1067658

URL: http://svn.apache.org/viewvc?rev=1067658&view=rev
Log:
CAMEL-3576: Use Camel managed thread pool for jms consumers, if no custom configured.

Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Sun Feb  6 13:24:44 2011
@@ -35,9 +35,7 @@ import org.springframework.jms.core.JmsO
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.jms.listener.SimpleMessageListenerContainer;
 import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
@@ -349,8 +347,8 @@ public class JmsConfiguration implements
         return template;
     }
 
-    public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint)
throws Exception {
-        AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(endpoint);
+    public DefaultMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint)
throws Exception {
+        DefaultMessageListenerContainer container = new JmsMessageListenerContainer(endpoint);
         configureMessageListenerContainer(container, endpoint);
         return container;
     }
@@ -386,8 +384,7 @@ public class JmsConfiguration implements
     }
 
     /**
-     * Sets the connection factory to be used for consuming messages via the
-     * {@link #createMessageListenerContainer(JmsEndpoint)}
+     * Sets the connection factory to be used for consuming messages
      *
      * @param listenerConnectionFactory the connection factory to use for
      *                                  consuming messages
@@ -802,7 +799,7 @@ public class JmsConfiguration implements
     }
 
 
-    protected void configureMessageListenerContainer(AbstractMessageListenerContainer container,
+    protected void configureMessageListenerContainer(DefaultMessageListenerContainer container,
                                                      JmsEndpoint endpoint) throws Exception
{
         container.setConnectionFactory(getListenerConnectionFactory());
         if (endpoint instanceof DestinationEndpoint) {
@@ -844,69 +841,52 @@ public class JmsConfiguration implements
             container.setMessageSelector(endpoint.getSelector());
         }
 
-        if (container instanceof DefaultMessageListenerContainer) {
-            // this includes DefaultMessageListenerContainer102
-            DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)
container;
-            if (concurrentConsumers >= 0) {
-                listenerContainer.setConcurrentConsumers(concurrentConsumers);
-            }
+        if (concurrentConsumers >= 0) {
+            container.setConcurrentConsumers(concurrentConsumers);
+        }
 
-            if (cacheLevel >= 0) {
-                listenerContainer.setCacheLevel(cacheLevel);
-            } else if (cacheLevelName != null) {
-                listenerContainer.setCacheLevelName(cacheLevelName);
-            } else {
-                listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
-            }
+        if (cacheLevel >= 0) {
+            container.setCacheLevel(cacheLevel);
+        } else if (cacheLevelName != null) {
+            container.setCacheLevelName(cacheLevelName);
+        } else {
+            container.setCacheLevel(defaultCacheLevel(endpoint));
+        }
 
-            if (idleTaskExecutionLimit >= 0) {
-                listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
-            }
-            if (maxConcurrentConsumers > 0) {
-                if (maxConcurrentConsumers < concurrentConsumers) {
-                    throw new IllegalArgumentException("Property maxConcurrentConsumers:
" + maxConcurrentConsumers
-                            + " must be higher than concurrentConsumers: " + concurrentConsumers);
-                }
-                listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
-            }
-            if (maxMessagesPerTask >= 0) {
-                listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
-            }
-            listenerContainer.setPubSubNoLocal(pubSubNoLocal);
-            if (receiveTimeout >= 0) {
-                listenerContainer.setReceiveTimeout(receiveTimeout);
-            }
-            if (recoveryInterval >= 0) {
-                listenerContainer.setRecoveryInterval(recoveryInterval);
-            }
-            if (taskExecutor != null) {
-                listenerContainer.setTaskExecutor(taskExecutor);
-            }
-            PlatformTransactionManager tm = getTransactionManager();
-            if (tm != null && transacted) {
-                listenerContainer.setTransactionManager(tm);
-            } else if (transacted) {
-                throw new IllegalArgumentException("Property transacted is enabled but a
transactionManager was not injected!");
-            }
-            if (transactionName != null) {
-                listenerContainer.setTransactionName(transactionName);
-            }
-            if (transactionTimeout >= 0) {
-                listenerContainer.setTransactionTimeout(transactionTimeout);
-            }
-            if (taskExecutor != null) {
-                listenerContainer.setTaskExecutor(taskExecutor);
-            }
-        } else if (container instanceof SimpleMessageListenerContainer) {
-            // this includes SimpleMessageListenerContainer102
-            SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)
container;
-            if (concurrentConsumers >= 0) {
-                listenerContainer.setConcurrentConsumers(concurrentConsumers);
-            }
-            listenerContainer.setPubSubNoLocal(pubSubNoLocal);
-            if (taskExecutor != null) {
-                listenerContainer.setTaskExecutor(taskExecutor);
+        if (idleTaskExecutionLimit >= 0) {
+            container.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
+        }
+        if (maxConcurrentConsumers > 0) {
+            if (maxConcurrentConsumers < concurrentConsumers) {
+                throw new IllegalArgumentException("Property maxConcurrentConsumers: " +
maxConcurrentConsumers
+                        + " must be higher than concurrentConsumers: " + concurrentConsumers);
             }
+            container.setMaxConcurrentConsumers(maxConcurrentConsumers);
+        }
+        if (maxMessagesPerTask >= 0) {
+            container.setMaxMessagesPerTask(maxMessagesPerTask);
+        }
+        container.setPubSubNoLocal(pubSubNoLocal);
+        if (receiveTimeout >= 0) {
+            container.setReceiveTimeout(receiveTimeout);
+        }
+        if (recoveryInterval >= 0) {
+            container.setRecoveryInterval(recoveryInterval);
+        }
+        if (taskExecutor != null) {
+            container.setTaskExecutor(taskExecutor);
+        }
+        PlatformTransactionManager tm = getTransactionManager();
+        if (tm != null && transacted) {
+            container.setTransactionManager(tm);
+        } else if (transacted) {
+            throw new IllegalArgumentException("Property transacted is enabled but a transactionManager
was not injected!");
+        }
+        if (transactionName != null) {
+            container.setTransactionName(transactionName);
+        }
+        if (transactionTimeout >= 0) {
+            container.setTransactionTimeout(transactionTimeout);
         }
     }
 
@@ -932,10 +912,6 @@ public class JmsConfiguration implements
         }
     }
 
-    public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation(JmsEndpoint
endpoint) {
-        return new JmsMessageListenerContainer(endpoint);
-    }
-
     /**
      * Defaults the JMS cache level if none is explicitly specified.
      * <p/>

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
Sun Feb  6 13:24:44 2011
@@ -22,20 +22,20 @@ import org.apache.camel.FailedToCreateCo
 import org.apache.camel.Processor;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.DefaultConsumer;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.JmsUtils;
 
 /**
- * A {@link org.apache.camel.Consumer} which uses Spring's {@link AbstractMessageListenerContainer}
implementations to consume JMS messages
+ * A {@link org.apache.camel.Consumer} which uses Spring's {@link DefaultMessageListenerContainer}
implementations to consume JMS messages
  *
  * @version $Revision$
  */
 public class JmsConsumer extends DefaultConsumer implements SuspendableService {
-    private AbstractMessageListenerContainer listenerContainer;
+    private DefaultMessageListenerContainer listenerContainer;
     private EndpointMessageListener messageListener;
     private volatile boolean initialized;
 
-    public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer
listenerContainer) {
+    public JmsConsumer(JmsEndpoint endpoint, Processor processor, DefaultMessageListenerContainer
listenerContainer) {
         super(endpoint, processor);
         this.listenerContainer = listenerContainer;
         this.listenerContainer.setMessageListener(getEndpointMessageListener());
@@ -45,7 +45,7 @@ public class JmsConsumer extends Default
         return (JmsEndpoint) super.getEndpoint();
     }
 
-    public AbstractMessageListenerContainer getListenerContainer() throws Exception {
+    public DefaultMessageListenerContainer getListenerContainer() throws Exception {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
@@ -66,7 +66,7 @@ public class JmsConsumer extends Default
 
     protected void createMessageListenerContainer() throws Exception {
         listenerContainer = getEndpoint().createMessageListenerContainer();
-        getEndpoint().configureListenerContainer(listenerContainer);
+        getEndpoint().configureListenerContainer(listenerContainer, this);
         listenerContainer.setMessageListener(getEndpointMessageListener());
     }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Sun Feb  6 13:24:44 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.ConnectionFactory;
@@ -45,10 +46,13 @@ import org.apache.camel.impl.DefaultExch
 import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.core.JmsOperations;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -62,6 +66,7 @@ import org.springframework.transaction.P
  */
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport,
Service {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -148,15 +153,15 @@ public class JmsEndpoint extends Default
     }
 
     public JmsConsumer createConsumer(Processor processor) throws Exception {
-        AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer(this);
+        DefaultMessageListenerContainer listenerContainer = createMessageListenerContainer();
         return createConsumer(processor, listenerContainer);
     }
 
-    public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception
{
+    public DefaultMessageListenerContainer createMessageListenerContainer() throws Exception
{
         return configuration.createMessageListenerContainer(this);
     }
 
-    public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer)
{
+    public void configureListenerContainer(DefaultMessageListenerContainer listenerContainer,
JmsConsumer consumer) {
         if (destinationName != null) {
             listenerContainer.setDestinationName(destinationName);
         } else if (destination != null) {
@@ -170,6 +175,29 @@ public class JmsEndpoint extends Default
             }
         }
         listenerContainer.setPubSubDomain(pubSubDomain);
+
+        if (configuration.getTaskExecutor() != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using custom TaskExecutor: " + configuration.getTaskExecutor()
+ " on listener container: " + listenerContainer);
+            }
+            listenerContainer.setTaskExecutor(configuration.getTaskExecutor());
+        } else {
+            // include destination name as part of thread name
+            String name = "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
+            // use a cached pool as DefaultMessageListenerContainer will throttle pool sizing
+            ExecutorService executor = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(consumer,
name);
+            listenerContainer.setTaskExecutor(executor);
+        }
+    }
+
+    /**
+     * Gets the destination name which was configured from the endpoint uri.
+     *
+     * @return the destination name resolved from the endpoint uri
+     */
+    public String getEndpointConfiguredDestinationName() {
+        String remainder = ObjectHelper.after(getEndpointKey(), "//");
+        return JmsMessageHelper.normalizeDestinationName(remainder);
     }
 
     /**
@@ -180,9 +208,10 @@ public class JmsEndpoint extends Default
      * @return a newly created consumer
      * @throws Exception if the consumer cannot be created
      */
-    public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer
listenerContainer) throws Exception {
-        configureListenerContainer(listenerContainer);
-        return new JmsConsumer(this, processor, listenerContainer);
+    public JmsConsumer createConsumer(Processor processor, DefaultMessageListenerContainer
listenerContainer) throws Exception {
+        JmsConsumer consumer = new JmsConsumer(this, processor, listenerContainer);
+        configureListenerContainer(listenerContainer, consumer);
+        return consumer;
     }
 
     @Override

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
Sun Feb  6 13:24:44 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms.r
 
 import java.math.BigInteger;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -25,7 +26,6 @@ import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.util.IntrospectionSupport;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
@@ -206,6 +206,10 @@ public class PersistentQueueReplyManager
         }
         if (endpoint.getTaskExecutor() != null) {
             answer.setTaskExecutor(endpoint.getTaskExecutor());
+        } else {
+            String name = "PersistentReplyManager[" + endpoint.getEndpointConfiguredDestinationName()
+ "]";
+            ExecutorService executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this,
name);
+            answer.setTaskExecutor(executor);
         }
 
         return answer;

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Sun Feb  6 13:24:44 2011
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms.reply;
 
+import java.util.concurrent.ExecutorService;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
@@ -122,6 +123,10 @@ public class TemporaryQueueReplyManager 
         }
         if (endpoint.getTaskExecutor() != null) {
             answer.setTaskExecutor(endpoint.getTaskExecutor());
+        } else {
+            String name = "TemporaryReplyManager[" + endpoint.getEndpointConfiguredDestinationName()
+ "]";
+            ExecutorService executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this,
name);
+            answer.setTaskExecutor(executor);
         }
 
         return answer;

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java?rev=1067658&r1=1067657&r2=1067658&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleInOnlyNoMutateTest.java
Sun Feb  6 13:24:44 2011
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms;
 
 import javax.jms.ConnectionFactory;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;



Mime
View raw message