camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boday <ben.o...@initekconsulting.com>
Subject Re: Each concurrentConsumer on JMS Topic receives ALL msgs
Date Thu, 07 Jul 2011 14:35:37 GMT
you can always use AMQ virtual topics instead, then you can do concurrent
consumption because they are queues...see the camel durable subscriber EIP
for an example...

_______________
Ben O'Day

On Jul 7, 2011, at 6:18 AM, "enalposi [via Camel]" <
ml-node+4561025-2099553868-45011@n5.nabble.com> wrote:

Right, if I set concurrentConsumers=1 and while building the route
threads(10) I do end up with a thread pool, however, these threads are used
serially.

The configuration is as follows:

        final String from =
"jms:topic:esb_test_topic?concurrentConsumers=1";
        final String to   = "bean:jmsTopicListener?method=process";
        RouteBuilder builder = new RouteBuilder(camelCtx_) {
            @Override
            public void configure() throws Exception {
                onException(InvalidDestinationException.class,
UnknownReplyMessageException.class).process(
                        new CamelExceptionHandlerImpl(from, to)).stop();
                from(from).threads(5).setExchangePattern(pattern).to(to);
            }
        };

The 'listener' bean simply prints a message on receipt, Thread.sleeps for
1sec and prints another msg with the following output: Threads are being
picked out of the pool with 1sec gaps.

    public void process(Object _msg) {
        log.info(String.format("Received(|->) [%1$s/%2$s]: %3$s",
df.format(new Date()), beanName_, _msg));
        if (sleepTime_ > 0) {
            if (_msg instanceof SampleDto) {
                SampleDto dto = (SampleDto) _msg;
                dto.setThreadName(Thread.currentThread().getName());
                data_.add(dto);
            }
            try {
                Thread.sleep(sleepTime_);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.info(String.format("Received(<-|) [%1$s/%2$s]: %3$s",
df.format(new Date()), beanName_, _msg));
    }

Something in the Camel libs seems to be blocking...

2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-17                        | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_13;thread=Thread-17]
2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-6                         | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_2;thread=Thread-6]
2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-16                        | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_12;thread=Thread-16]
2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-5                         | Sent [20110707
09:03:08:351]: SampleDtoImpl[data_1;thread=Thread-5]
2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-8                         | Sent [20110707
09:03:08:351]: SampleDtoImpl[data_4;thread=Thread-8]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-14                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_10;thread=Thread-14]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-13                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_9;thread=Thread-13]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-18                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_14;thread=Thread-18]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-4                         | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_0;thread=Thread-4]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-10                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_6;thread=Thread-10]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-15                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_11;thread=Thread-15]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-9                         | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_5;thread=Thread-9]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-11                        | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_7;thread=Thread-11]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-12                        | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_8;thread=Thread-12]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-7                         | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_3;thread=Thread-7]
2011-07-07 09:03:08,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:08:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_13;thread=Thread-17]
2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:09:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_13;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:09:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_4;thread=Thread-8]
2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:10:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_4;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:10:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_1;thread=Thread-5]
2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:11:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_1;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(|->)
[20110707 09:03:11:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_2;thread=Thread-6]
2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(<-|)
[20110707 09:03:12:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_2;thread=Camel (camel-4) thread #14 - Threads]
2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:12:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_12;thread=Thread-16]
2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(<-|)
[20110707 09:03:13:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_12;thread=Camel (camel-4) thread #15 - Threads]
2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:13:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_9;thread=Thread-13]
2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:14:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_9;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:14:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_6;thread=Thread-10]
2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:15:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_6;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:15:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_0;thread=Thread-4]
2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:16:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_0;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:16:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_10;thread=Thread-14]
2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:17:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_10;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:17:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_14;thread=Thread-18]
2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(<-|)
[20110707 09:03:18:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_14;thread=Camel (camel-4) thread #15 - Threads]
2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:18:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_5;thread=Thread-9]
2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:19:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_5;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:19:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_11;thread=Thread-15]
2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:20:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_11;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(|->)
[20110707 09:03:20:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_8;thread=Thread-12]
2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(<-|)
[20110707 09:03:21:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_8;thread=Camel (camel-4) thread #14 - Threads]
2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:21:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_3;thread=Thread-7]
2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:22:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_3;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:22:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_7;thread=Thread-11]




------------------------------
 If you reply to this email, your message will be added to the discussion
below:
http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561025.html
 To unsubscribe from Each concurrentConsumer on JMS Topic receives ALL msgs,
click here<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=4558687&code=YmVuLm9kYXlAaW5pdGVrY29uc3VsdGluZy5jb218NDU1ODY4N3wxNDU1ODI4NTg5>.


-----
Ben O'Day
IT Consultant -http://consulting-notes.com

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561241.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message