camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Using SEDA without losing messages
Date Mon, 30 Jan 2012 09:17:47 GMT
Hi

Check the seda documentation.
http://camel.apache.org/seda

There is a blockWhenFull option you can use to set true to have the
caller block if the seda queue is full



On Mon, Jan 30, 2012 at 7:20 AM, Sabin Timalsena
<stimalsena@veriskhealth.com> wrote:
> Hello,
>
>
>
> I'm a beginner in camel and ActiveMQ and was recently trying to study the
> behavior of SEDA queues.
>
> I'm not sure I understand the "size" property of SEDA queues. Say the  queue
> has (size=4) and (concurrentConsumers=4). 4 messages are brought into the
> queue. My understanding is that, as soon as the processing of those 4
> messages is started, 4 more are brought into the SEDA queue. So my
> assumption was that 8 messages would be dequeued initially from the place
> this SEDA queue is consuming from. However, in my tests, 9 messages were
> dequeued.
>
>
>
> Here's the setup I used for testing this behavior:
>
>
>
>                                private static final String SEDA_URI =
> "seda:tasks?size=4&concurrentConsumers=4&blockWhenFull=true";
>
>                                ...
>
>
>
>                                from("activemq:start1")
>
>                                .wireTap("direct:wiretap")
>
>                                .to(SEDA_URI);
>
>
>
>                                from(SEDA_URI)
>
>                                .process(new Processor() {
>
>
>
>                                                @Override
>
>                                                public void process(Exchange
> ex) throws Exception {
>
>                                                          
     Message in =
> ex.getIn();
>
>
> LOGGER.info("Procesing Message: " + in.getBody());
>
>
>
> Thread.sleep(10000);
>
>                                                }
>
>                                });
>
>
>
>                                from("direct:wiretap")
>
>                                .process(new Processor() {
>
>
>
>                                                @Override
>
>                                                public void process(Exchange
> exchange) throws Exception {
>
>                                                          
     Message in =
> exchange.getIn();
>
>
> LOGGER.info("Tapped Message: " + in.getBody());
>
>                                                }
>
>                                });
>
>
>
> The "activemq:start1" has 20 messages initially.
>
> Here's the output obtained just after the test is started:
>
>
>
> [ient) thread #5 - seda://tasks] SEDATests                      INFO
> Procesing Message: Message 0
>
> [ient) thread #2 - seda://tasks] SEDATests                      INFO
> Procesing Message: Message 1
>
> [ient) thread #3 - seda://tasks] SEDATests                      INFO
> Procesing Message: Message 2
>
> [ient) thread #4 - seda://tasks] SEDATests                      INFO
> Procesing Message: Message 3
>
> [el-client) thread #6 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 0
>
> [el-client) thread #7 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 1
>
> [el-client) thread #8 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 2
>
> [el-client) thread #9 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 3
>
> [l-client) thread #10 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 4
>
> [l-client) thread #11 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 5
>
> [l-client) thread #12 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 6
>
> [l-client) thread #13 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 7
>
> [l-client) thread #14 - WireTap] SEDATests                      INFO  Tapped
> Message: Message 8
>
>
>
> When I change the SEDA endpoint URI to have "size=1", 6 messages are
> dequeued initially. I don't understand why that extra one message is being
> dequeued. When full, does SEDA block *after* dequeuing the message?
>
>
>
> My objective is to prevent messages from being dequeued from
> "activemq:start1", if they won't be processed immediately.
>
> Please give me some suggestions
>
>
>
> Thanks in advance...
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Mime
View raw message