camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Is asyncDelayedRedelivery really async?
Date Fri, 09 Dec 2011 12:27:09 GMT
Yes its async, as it uses a scheduled thread pool, to schedule a time
in the future, to wake up, and execute the redelivery.

However as its using the async routing engine, then the components in
play (eg most often the consumer) need to support async routing
engine.
You can see a list of supported here
http://camel.apache.org/asynchronous-routing-engine.html
And you can argue that seda support that as well.

Some components do not support async routing by nature, and so the
thread will block in the consumer, until the exchange is done.


2011/12/9 Zhemzhitsky Sergey <Sergey_Zhemzhitsky@troika.ru>:
> Hi guys,
>
> Could you please explain how asyncDelayedRedelivery of redelivery policies really works,
because I have not to block the route when some exchange fails and it should be redelivered.
So I’m a little bit confused of how to use async. redelivery properly with different types
of endpoints.
>
> I have a route that that have timer endpoint as its source. Events are fired every second.
> I also configured executor service with 10 threads for my error handler.
>
> The 1st  and the 4th sent messages are marked to be always redelivered.
> So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds
(time fires every second) after the 3rd sent message (2nd arrived), because the redelivery
is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously
and the 6th sent message arrives about 6 seconds  after the 3rd sent message.
>
> I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete
parameter to Always. In that case the redelivery is synchronous too.
>
> With quartz endpoints everything depends on the size of the quartz thread pool, so if
its size is more than 1 and redelivery is synchronous quartz continues to fire events until
thread pool is exhausted. With async. redelivery the behavior is the same except that the
thread that is used  for redelivery differs from the calling thread.
>
> From all described above I understood that calling thread always waits for the exchange
to complete even if the redelivery is should happen asynchronously.
>
>
> // BELOW IS MY UNIT TEST
>
> package org.foo.bar;
>
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import javax.naming.Context;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.component.quartz.QuartzComponent;
> import org.apache.camel.processor.RedeliveryPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
> public class AsyncDelayedRedeliveryTest extends CamelTestSupport {
>
>    private static final int MAXIMUM_REDELIVERIES = 1;
>
>    @Test
>    public void asyncRedeliveryTimer() throws Exception {
>        context().addRoutes(new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>                errorHandler(defaultErrorHandler().executorServiceRef("executorService"));
>
>                onException(Exception.class)
>                    .redeliveryPolicyRef("redeliveryPolicy")
>                    .handled(true)
>                    .onRedelivery(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws Exception
{
>                            System.out.println("Redelivered : " + exchange.getIn().getBody()
+ " : " +Thread.currentThread().getName());
>                        }
>                    })
>                    .to("mock:exception");
>
>                from("timer:start?repeatCount=10&period=1000")
> //                from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&trigger.misfireInstruction=2")
> //                    .to("seda:next?waitForTaskToComplete=Never&timeout=100");
>
> //                from("seda:next?concurrentConsumers=2&size=2")
>                    .process(new Processor() {
>                        private AtomicInteger counter = new AtomicInteger();
>
>                        @Override
>                        public void process(Exchange exchange) throws Exception
{
>                            if (counter.compareAndSet(0, 1) || counter.compareAndSet(3,
4)) {
>                                exchange.setProperty("ThrowException",
Boolean.TRUE);
>                                exchange.getIn().setBody(counter.get()
- 1);
>                            } else {
>                                exchange.getIn().setBody(counter.getAndIncrement());
>                            }
>                        }
>                    })
>                    .process(new Processor() {
>                        @Override
>                        public void process(Exchange exchange) throws Exception
{
>                            if(Boolean.TRUE.equals(exchange.getProperty("ThrowException",
Boolean.class))) {
>                                throw new RuntimeException("Test Exception!");
>                            }
>                        }
>                    })
>                    .to("mock:result");
>            }
>        });
>
>        MockEndpoint result = getMockEndpoint("mock:result");
>        result.whenAnyExchangeReceived(new Processor() {
>            @Override
>            public void process(Exchange exchange) throws Exception {
>                System.out.println("Message :     " + exchange.getIn().getBody()
+ " : " +Thread.currentThread().getName());
>            }
>        });
>        result.expectedMessageCount(8);
>        result.allMessages().property("ThrowException").isNull();
>
>        // the 1th and 4th sent messages are always redelivered,
>        // so the 4th delivered message is the 6th sent message
>        result.message(3).body().isEqualTo(5);
>        // as we're trying to use async redelivery we're expecting that
>        // the 6th sent message (4th arrived) must arrive no later than 3rd send message
(2nd arrived)
>        result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
>        // the same as the prev. assertion
>        result.message(2).arrives().noLaterThan(2).seconds().beforeNext();
>
>        MockEndpoint exception = getMockEndpoint("mock:exception");
>        exception.expectedMessageCount(2);
>        exception.allMessages().property("ThrowException").isNotNull();
>
>        startCamelContext();
>
>        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
>    }
>
>    @Override
>    protected Context createJndiContext() throws Exception {
>        Context jndiContext = super.createJndiContext();
>
>        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>        redeliveryPolicy.setAsyncDelayedRedelivery(false);
>        redeliveryPolicy.setLogRetryAttempted(true);
>        redeliveryPolicy.setLogExhausted(false);
>        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
>        redeliveryPolicy.setRedeliveryDelay(5000);
>        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);
>
>        Properties props = new Properties();
>        props.setProperty("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
>        props.setProperty("org.quartz.scheduler.rmi.export", "false");
>        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
>        props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransaction",
"false");
>        props.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
>        props.setProperty("org.quartz.threadPool.threadCount", "5");
>        props.setProperty("org.quartz.threadPool.threadPriority", "5");
>        props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread",
"true");
>        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
>        props.setProperty("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
>        props.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
>
>        QuartzComponent quartz = new QuartzComponent();
>        quartz.setProperties(props);
>        jndiContext.bind("quartz", quartz);
>
>        ExecutorService executors = Executors.newScheduledThreadPool(10);
>        jndiContext.bind("executorService", executors);
>
>        return jndiContext;
>    }
>
>    @Override
>    public boolean isUseRouteBuilder() {
>        return false;
>    }
>
> }
>
> Best Regards,
> Sergey
>
> _______________________________________________________
>
> The information contained in this message may be privileged and conf idential and protected
from disclosure. If you are not the original intended recipient, you are hereby notified that
any review, retransmission, dissemination, or other use of, or taking of any action in reliance
upon, this information is prohibited. If you have received this communication in error, please
notify the sender immediately by replying to this message and delete it from your computer.
Thank you for your cooperation. Troika Dialog, Russia.
> If you need assistance please contact our Contact Center  (+7495) 258 0500 or go to
www.troika.ru/eng/Contacts/system.wbp
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Mime
View raw message