camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Dillon <ja...@planet57.com>
Subject How to get full control over threads used by JMS/ActiveMQ component?
Date Thu, 24 Nov 2011 22:19:42 GMT
I've been trying for a while now to figure out how to get full control over the threads used
by the JMS/ActiveMQ components... and so far I've had zero luck.  Seems like no matter what
I try, the JMS component always goes and makes a bunch of threads.

I'm testing out with some simple routes:

from("jms:topic:test.>").bean(new Tester("root"));
from("jms:topic:test.a").bean(new Tester("a"));
from("jms:topic:test.b").bean(new Tester("b"));
from("jms:topic:test.c").bean(new Tester("c"));
from("jms:topic:test.d").bean(new Tester("d"));
from("jms:topic:test.e").bean(new Tester("e"));

Tester simply logs the message body with a prefix of its constructor argument.

"jms" component is basically:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(connectionFactory);
singleConnectionFactory.setReconnectOnException(true);
ActiveMQConfiguration jmsConfig = new ActiveMQConfiguration();
jmsConfig.setConnectionFactory(singleConnectionFactory);
jmsConfig.setConsumerType(ConsumerType.Simple);
ActiveMQComponent jms = new ActiveMQComponent(jmsConfig);

I then start everything up and send off some messages:

for (int i=0; i<10; i++) {
    producer.sendBody("jms:topic:test.a", "a");
    producer.sendBody("jms:topic:test.b", "b");
    producer.sendBody("jms:topic:test.c", "c");
    producer.sendBody("jms:topic:test.d", "d");
    producer.sendBody("jms:topic:test.e", "e");
    producer.sendBody("jms:topic:test.f", "f");
    producer.sendBody("jms:topic:test.g", "g");
}

This ends up making a bunch of threads JmsConsumer threads:

"Camel (camel-1) thread #0 - JmsConsumer[test.a]"@1,678 in group "main": RUNNING
"Camel (camel-1) thread #1 - JmsConsumer[test.>]"@1,681 in group "main": WAIT
"Camel (camel-1) thread #10 - JmsConsumer[test.>]"@1,717 in group "main": WAIT
"Camel (camel-1) thread #11 - JmsConsumer[test.>]"@1,722 in group "main": WAIT
"Camel (camel-1) thread #12 - JmsConsumer[test.>]"@1,725 in group "main": WAIT
"Camel (camel-1) thread #13 - JmsConsumer[test.a]"@1,726 in group "main": WAIT
"Camel (camel-1) thread #15 - JmsConsumer[test.>]"@1,730 in group "main": WAIT
"Camel (camel-1) thread #16 - JmsConsumer[test.b]"@1,731 in group "main": WAIT
"Camel (camel-1) thread #17 - JmsConsumer[test.>]"@1,735 in group "main": WAIT
"Camel (camel-1) thread #18 - JmsConsumer[test.c]"@1,736 in group "main": WAIT
"Camel (camel-1) thread #19 - JmsConsumer[test.>]"@1,737 in group "main": WAIT
"Camel (camel-1) thread #2 - JmsConsumer[test.>]"@1,701 in group "main": WAIT
"Camel (camel-1) thread #20 - JmsConsumer[test.d]"@1,738 in group "main": WAIT
"Camel (camel-1) thread #21 - JmsConsumer[test.>]"@1,739 in group "main": WAIT
"Camel (camel-1) thread #22 - JmsConsumer[test.e]"@1,740 in group "main": WAIT
"Camel (camel-1) thread #23 - JmsConsumer[test.>]"@1,741 in group "main": WAIT
"Camel (camel-1) thread #24 - JmsConsumer[test.>]"@1,742 in group "main": WAIT
"Camel (camel-1) thread #25 - JmsConsumer[test.>]"@1,743 in group "main": WAIT
"Camel (camel-1) thread #26 - JmsConsumer[test.a]"@1,744 in group "main": WAIT
"Camel (camel-1) thread #27 - JmsConsumer[test.>]"@1,746 in group "main": WAIT
"Camel (camel-1) thread #28 - JmsConsumer[test.b]"@1,747 in group "main": WAIT
"Camel (camel-1) thread #29 - JmsConsumer[test.>]"@1,748 in group "main": WAIT
"Camel (camel-1) thread #3 - JmsConsumer[test.b]"@1,702 in group "main": WAIT
"Camel (camel-1) thread #30 - JmsConsumer[test.c]"@1,749 in group "main": WAIT
"Camel (camel-1) thread #31 - JmsConsumer[test.d]"@1,752 in group "main": WAIT
"Camel (camel-1) thread #32 - JmsConsumer[test.>]"@1,751 in group "main": WAIT
"Camel (camel-1) thread #33 - JmsConsumer[test.>]"@1,755 in group "main": WAIT
"Camel (camel-1) thread #34 - JmsConsumer[test.e]"@1,756 in group "main": WAIT
"Camel (camel-1) thread #35 - JmsConsumer[test.>]"@1,758 in group "main": WAIT
"Camel (camel-1) thread #36 - JmsConsumer[test.>]"@1,759 in group "main": WAIT
"Camel (camel-1) thread #37 - JmsConsumer[test.>]"@1,760 in group "main": WAIT
"Camel (camel-1) thread #39 - JmsConsumer[test.>]"@1,762 in group "main": WAIT
"Camel (camel-1) thread #4 - JmsConsumer[test.>]"@1,705 in group "main": WAIT
"Camel (camel-1) thread #5 - JmsConsumer[test.c]"@1,706 in group "main": WAIT
"Camel (camel-1) thread #6 - JmsConsumer[test.>]"@1,709 in group "main": WAIT
"Camel (camel-1) thread #7 - JmsConsumer[test.d]"@1,710 in group "main": WAIT
"Camel (camel-1) thread #8 - JmsConsumer[test.>]"@1,713 in group "main": WAIT
"Camel (camel-1) thread #9 - JmsConsumer[test.e]"@1,714 in group "main": WAIT

And the ExecutorServiceStrategy looks like its got a pool for each of these components + one
more:

	• executorServiceStrategy = {org.apache.camel.impl.DefaultExecutorServiceStrategy@1771}
	• executorServices = {java.util.ArrayList@2666} size = 8
	• [0] = {java.util.concurrent.ThreadPoolExecutor@1773}
	• [1] = {java.util.concurrent.ThreadPoolExecutor@2687}
	• [2] = {java.util.concurrent.ScheduledThreadPoolExecutor@2688}
	• [3] = {java.util.concurrent.ThreadPoolExecutor@2689}
	• [4] = {java.util.concurrent.ThreadPoolExecutor@2690}
	• [5] = {java.util.concurrent.ThreadPoolExecutor@2691}
	• [6] = {java.util.concurrent.ThreadPoolExecutor@2692}
	• [7] = {java.util.concurrent.ThreadPoolExecutor@2693}
 
Does each route gets its own threadpool?

I've tried to set the default thread pool profile to something small, like:

ThreadPoolProfile defaultProfile = new ThreadPoolProfileSupport("defaultThreadPoolProfile2");
        defaultProfile.setPoolSize(1);
        defaultProfile.setMaxPoolSize(1);
        defaultProfile.setKeepAliveTime(60L);
        defaultProfile.setTimeUnit(TimeUnit.SECONDS);
        defaultProfile.setMaxQueueSize(1000);
        defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
        executorServiceStrategy.setDefaultThreadPoolProfile(defaultProfile);

But this does not seem to have much effect on the number of threads created.

I've tried setting the executorServiceRef explicitly via threads().executorServiceRef() but
that also doesn't seem to affect the configuration used to create the JmsConsumer threads.

 * * *

I'm trying to setup some topic subscriptions (may have many of them) which flow over a single
connection using the JMS message listener's session thread (not polling receive with the Default
consumer type) and trying to get all of those consumers to fire off to a single small threadpool
(~5) for processing. 

Is that possible with Camel & the JMS/ActiveMQ components, or do I need to write a custom
JMS component which provides the desired thread semantics? I may have to write a custom component
anyways, to get rid of the ~5mb of spring dependencies which these components pull in, but
thats another matter.  I'd still really like to know how to get full control over the threads
used by the JMS/ActiveMQ components as they are today.

Can anyone drop some knowledge?

--jason



Mime
View raw message