camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Is asyncDelayedRedelivery really async?
Date Wed, 25 Jan 2012 04:47:26 GMT
On Tue, Jan 24, 2012 at 8:22 PM, Masters, Bill
<Bill.Masters@centurylink.com> wrote:
> Hi Claus.
>
> I looked at http://camel.apache.org/asynchronous-routing-engine.html.
> I'm trying to implement a route that reads a file and transfers it via SFTP.
> If the destination is down for maintenance, I'd like to retry the transfer later (say
15 minutes) up to 4 times but without blocking the threads.
> It looks like SFTP is not on the list of components that support asynch routing engine.
True?
> If so, what is the right way to implement asyncDelayedRedelivery?
>

That doesn't matter for producers whether they support natively async
or not, when you are doing redeliveries.
As Camel is doing this. So from the producer point of view, its just
another processing, that happens 15 minutes later.

So you should be fine. If you configure the Camel error handler to use
async delayed, then no threads is blocked.


> Thanks
>
> Bill Masters
> Centurylink
>
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ibsen@gmail.com]
> Sent: Friday, December 09, 2011 5:27 AM
> To: users@camel.apache.org
> Subject: Re: Is asyncDelayedRedelivery really async?
>
> Yes its async, as it uses a scheduled thread pool, to schedule a time in the future,
to wake up, and execute the redelivery.
>
> However as its using the async routing engine, then the components in play (eg most often
the consumer) need to support async routing engine.
> You can see a list of supported here
> http://camel.apache.org/asynchronous-routing-engine.html
> And you can argue that seda support that as well.
>
> Some components do not support async routing by nature, and so the thread will block
in the consumer, until the exchange is done.
>
>
> 2011/12/9 Zhemzhitsky Sergey <Sergey_Zhemzhitsky@troika.ru>:
>> Hi guys,
>>
>> Could you please explain how asyncDelayedRedelivery of redelivery policies really
works, because I have not to block the route when some exchange fails and it should be redelivered.
So I'm a little bit confused of how to use async. redelivery properly with different types
of endpoints.
>>
>> I have a route that that have timer endpoint as its source. Events are fired every
second.
>> I also configured executor service with 10 threads for my error handler.
>>
>> The 1st  and the 4th sent messages are marked to be always redelivered.
>> So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds
(time fires every second) after the 3rd sent message (2nd arrived), because the redelivery
is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously
and the 6th sent message arrives about 6 seconds  after the 3rd sent message.
>>
>> I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete
parameter to Always. In that case the redelivery is synchronous too.
>>
>> With quartz endpoints everything depends on the size of the quartz thread pool, so
if its size is more than 1 and redelivery is synchronous quartz continues to fire events until
thread pool is exhausted. With async. redelivery the behavior is the same except that the
thread that is used  for redelivery differs from the calling thread.
>>
>> From all described above I understood that calling thread always waits for the exchange
to complete even if the redelivery is should happen asynchronously.
>>
>>
>> // BELOW IS MY UNIT TEST
>>
>> package org.foo.bar;
>>
>> import java.util.Properties;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors; import
>> java.util.concurrent.TimeUnit; import
>> java.util.concurrent.atomic.AtomicInteger;
>>
>> import javax.naming.Context;
>>
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.component.quartz.QuartzComponent;
>> import org.apache.camel.processor.RedeliveryPolicy;
>> import org.apache.camel.test.junit4.CamelTestSupport;
>> import org.junit.Test;
>>
>> public class AsyncDelayedRedeliveryTest extends CamelTestSupport {
>>
>>    private static final int MAXIMUM_REDELIVERIES = 1;
>>
>>    @Test
>>    public void asyncRedeliveryTimer() throws Exception {
>>        context().addRoutes(new RouteBuilder() {
>>            @Override
>>            public void configure() throws Exception {
>>
>> errorHandler(defaultErrorHandler().executorServiceRef("executorService
>> "));
>>
>>                onException(Exception.class)
>>                    .redeliveryPolicyRef("redeliveryPolicy")
>>                    .handled(true)
>>                    .onRedelivery(new Processor() {
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                            System.out.println("Redelivered : " +
>> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>>                        }
>>                    })
>>                    .to("mock:exception");
>>
>>                from("timer:start?repeatCount=10&period=1000")
>> //
>> from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&t
>> rigger.misfireInstruction=2") //
>> .to("seda:next?waitForTaskToComplete=Never&timeout=100");
>>
>> //                from("seda:next?concurrentConsumers=2&size=2")
>>                    .process(new Processor() {
>>                        private AtomicInteger counter = new
>> AtomicInteger();
>>
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                            if (counter.compareAndSet(0, 1) ||
>> counter.compareAndSet(3, 4)) {
>>                                exchange.setProperty("ThrowException",
>> Boolean.TRUE);
>>                                exchange.getIn().setBody(counter.get()
>> - 1);
>>                            } else {
>>
>> exchange.getIn().setBody(counter.getAndIncrement());
>>                            }
>>                        }
>>                    })
>>                    .process(new Processor() {
>>                        @Override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>
>> if(Boolean.TRUE.equals(exchange.getProperty("ThrowException",
>> Boolean.class))) {
>>                                throw new RuntimeException("Test
>> Exception!");
>>                            }
>>                        }
>>                    })
>>                    .to("mock:result");
>>            }
>>        });
>>
>>        MockEndpoint result = getMockEndpoint("mock:result");
>>        result.whenAnyExchangeReceived(new Processor() {
>>            @Override
>>            public void process(Exchange exchange) throws Exception {
>>                System.out.println("Message :     " +
>> exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
>>            }
>>        });
>>        result.expectedMessageCount(8);
>>        result.allMessages().property("ThrowException").isNull();
>>
>>        // the 1th and 4th sent messages are always redelivered,
>>        // so the 4th delivered message is the 6th sent message
>>        result.message(3).body().isEqualTo(5);
>>        // as we're trying to use async redelivery we're expecting that
>>        // the 6th sent message (4th arrived) must arrive no later than
>> 3rd send message (2nd arrived)
>>
>> result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
>>        // the same as the prev. assertion
>>
>> result.message(2).arrives().noLaterThan(2).seconds().beforeNext();
>>
>>        MockEndpoint exception = getMockEndpoint("mock:exception");
>>        exception.expectedMessageCount(2);
>>        exception.allMessages().property("ThrowException").isNotNull();
>>
>>        startCamelContext();
>>
>>        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
>>    }
>>
>>    @Override
>>    protected Context createJndiContext() throws Exception {
>>        Context jndiContext = super.createJndiContext();
>>
>>        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>>        redeliveryPolicy.setAsyncDelayedRedelivery(false);
>>        redeliveryPolicy.setLogRetryAttempted(true);
>>        redeliveryPolicy.setLogExhausted(false);
>>        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
>>        redeliveryPolicy.setRedeliveryDelay(5000);
>>        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);
>>
>>        Properties props = new Properties();
>>        props.setProperty("org.quartz.scheduler.instanceName",
>> "DefaultQuartzScheduler");
>>        props.setProperty("org.quartz.scheduler.rmi.export", "false");
>>        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
>>
>> props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransact
>> ion", "false");
>>        props.setProperty("org.quartz.threadPool.class",
>> "org.quartz.simpl.SimpleThreadPool");
>>        props.setProperty("org.quartz.threadPool.threadCount", "5");
>>        props.setProperty("org.quartz.threadPool.threadPriority", "5");
>>
>> props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoa
>> derOfInitializingThread", "true");
>>        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
>>        props.setProperty("org.quartz.jobStore.class",
>> "org.quartz.simpl.RAMJobStore");
>>        props.setProperty("org.quartz.scheduler.skipUpdateCheck",
>> "true");
>>
>>        QuartzComponent quartz = new QuartzComponent();
>>        quartz.setProperties(props);
>>        jndiContext.bind("quartz", quartz);
>>
>>        ExecutorService executors =
>> Executors.newScheduledThreadPool(10);
>>        jndiContext.bind("executorService", executors);
>>
>>        return jndiContext;
>>    }
>>
>>    @Override
>>    public boolean isUseRouteBuilder() {
>>        return false;
>>    }
>>
>> }
>>
>> Best Regards,
>> Sergey
>>
>> _______________________________________________________
>>
>> The information contained in this message may be privileged and conf idential and
protected from disclosure. If you are not the original intended recipient, you are hereby
notified that any review, retransmission, dissemination, or other use of, or taking of any
action in reliance upon, this information is prohibited. If you have received this communication
in error, please notify the sender immediately by replying to this message and delete it from
your computer. Thank you for your cooperation. Troika Dialog, Russia.
>> If you need assistance please contact our Contact Center  (+7495) 258
>> 0500 or go to www.troika.ru/eng/Contacts/system.wbp
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>
>
> This communication is the property of CenturyLink and may contain confidential or privileged
information. Unauthorized use of this communication is strictly
> prohibited and may be unlawful.  If you have received this communication
> in error, please immediately notify the sender by reply e-mail and destroy
> all copies of the communication and any attachments.



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Mime
View raw message