camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Charles Moulliard <ch0...@gmail.com>
Subject Re: sjms and multiple threads
Date Mon, 17 Mar 2014 10:27:45 GMT
Commit available here :
https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9


On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch007m@gmail.com>wrote:

> I will commit soon a unit test. That could help us to verify if something
> goes wrong.
>
>
> On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <bengt@rodehav.com> wrote:
>
>> An update.
>>
>> I noticed in the stack trace that it seems to be the *production* of
>> messages that get the exception "Unable to complete sending the message:
>> Only one thread may use a JMS Session at a time." - not the *consumption*.
>> I also only showed you the first part of the route (cause I didn't think
>> the rest mattered). What I do is basically reading xml messages from one
>> queue, transforming them and the sending them to another queue.
>>
>> Moreover, one incoming message may end up creating more than one (in this
>> case 3) out messages. I'm using a splitter to accomplish this. In pseudo
>> code the route looks like this:
>>
>>   from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>>
>> If I set consumerCount=10 on the consumer endpoint AND use threads(10)
>> then
>> I can see that more threads are created but I get the "Only one thread may
>> use ..." exception. Now, if I also set the producerCount=10 on the
>> producer
>> endpoint then this exception goes away and my route works.
>>
>> Not exactly sure why this works. If anyone could explain the relationship
>> between consumer/producer count and threads I would appreciate it. A
>> theory
>> of mine is that there must be at least as many consumers/producers as
>> there
>> are threads or there is a risk that two threads will try to use the same
>> consumer/producer. If there is a one-to-one relationship between
>> consumer/producer and JMS session then this could explain the exception.
>> But this still sounds weird. If there is a pool of consumers/producers
>> then
>> the thread should try to acquire one from the pool. If no
>> consumer/producer
>> is available then the thread should wait until one is available - not try
>> to use one that is in use by another thread.
>>
>> Also, although I can get my route to work this way, I get no better
>> throughput than if I only use one thread. I get the exact same throughput
>> and my CPU is basically idling. Obviously I'm not doing this right.
>>
>> Do I also need multiple connections to the JMS server? Could this affect
>> the concurrency?
>>
>> /Bengt
>>
>>
>>
>>
>>
>> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <bengt@rodehav.com>:
>>
>> > I've now tried just using consumerCount on the endpoint without using
>> > threads(). However, it doesn't make it multithreaded. Only one thread is
>> > started for the route.
>> >
>> > Any other ideas? Has someone used sjms with a multithreaded consumer?
>> >
>> > /Bengt
>> >
>> >
>> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <bengt@rodehav.com>:
>> >
>> > Ok - thanks. I have tried it but only together with threads(). I didn't
>> >> realize that it might create threads on its own.
>> >>
>> >> /Bengt
>> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <claus.ibsen@gmail.com>:
>> >>
>> >> Hi
>> >>>
>> >>> I think you should use the consumerCount option on the endpoint
>> instead
>> >>>
>> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <bengt@rodehav.com>
>> >>> wrote:
>> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
with
>> >>> > Weblogic JMS.
>> >>> >
>> >>> > Everything works fine when I use a single thread. However, to
>> increase
>> >>> > throughput I want multiple threads to read messages from the queue.
>> >>> >
>> >>> > I've done this by using the "threads()" method:
>> >>> >
>> >>> >   from(sjms:...).threads(10)....
>> >>> >
>> >>> > However I get an exception as follows:
>> >>> >
>> >>> > java.lang.Exception: Unable to complete sending the message: Only
>> one
>> >>> > thread may use a JMS Session at a time.:Only one thread may use
a
>> JMS
>> >>> > Session at a time.
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>> >>> > at
>> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>> >>> >
>> >>> > What exactly does this mean and how can I consume messages via
sjms
>> >>> using
>> >>> > multiple threads?
>> >>> >
>> >>> > /Bengt
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Claus Ibsen
>> >>> -----------------
>> >>> Red Hat, Inc.
>> >>> Email: cibsen@redhat.com
>> >>> Twitter: davsclaus
>> >>> Blog: http://davsclaus.com
>> >>> Author of Camel in Action: http://www.manning.com/ibsen
>> >>> Make your Camel applications look hawt, try: http://hawt.io
>> >>>
>> >>
>> >
>>
>
>
>
> --
> Charles Moulliard
> Apache Committer / Architect @RedHat
> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>
>


-- 
Charles Moulliard
Apache Committer / Architect @RedHat
Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message