camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Missing feature to handle errors in a route which reads from an activemq destination
Date Sun, 31 Jul 2011 12:20:52 GMT
Frankly I thing the redelivery logic should be part of Apache ActiveMQ.

Your solution is brittle in the fact that you consume the message, and
then send it back to the broker. What happens if you cannot send the
message back to the broker?
It would be better if the broker handled all this out of the box.

ActiveMQ already has redelivery policy where you can configure various
delays options such as exponential backoff etc.

And there is a JIRA ticket for ActiveMQ to support asynchronous
scheduled redeliveries.
https://issues.apache.org/jira/browse/AMQ-1853
Which has the potential for consumers to pickup the next message, and
not block waiting to issue the redelivery.



On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
<christian.mueller@gmail.com> wrote:
> Hello!
>
> I didn't get so much responses as I hoped. May it was not clear what I try
> to do or what I miss at present in Camel.
> Therefore a build a small unit test (and attached it) to demonstrate what
> I'm looking for (for convenience I also paste the code into this mail at the
> end).
>
> I build my own (simple) redelivery processor which I use to re-enqueue
> messages into ActiveMQ after the default Camel error handler was kicked in
> and catched the exception. I use the same logic as Camel does (but much more
> simpler for this test) to trace the redelivery delay and count. But instead
> to wait for the retry, I put the message message into ActiveMQ for
> redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
> ActiveMQ to delay the delivery of the message:
>
> public class RedeliveryTest extends CamelTestSupport {
>
>     @EndpointInject(uri = "mock:intercept")
>     private MockEndpoint intercept;
>
>     @EndpointInject(uri = "mock:dlq")
>     private MockEndpoint dlq;
>
>     @Before
>     public void setUp() throws Exception {
>         disableJMX();
>
>         super.setUp();
>     }
>
>     @Test
>     public void redelieryTest() throws Exception {
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("activemq:queue:dlq")
>                     .to("mock:dlq");
>             }
>         });
>
>         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>         dlq.expectedMessageCount(1);
>
>         template.sendBody("activemq:queue:start", "Hello Camel!");
>
>         intercept.assertIsSatisfied();
>         dlq.assertIsSatisfied();
>
>         Exchange exchange = dlq.getExchanges().get(0);
>         assertEquals(new Long(5),
> exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>         assertEquals(new Long(3200l),
> exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>     }
>
>     @Override
>     protected JndiRegistry createRegistry() throws Exception {
>         JndiRegistry registry = super.createRegistry();
>
>         ActiveMQComponent activemq =
> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>         registry.bind("activemq", activemq);
>
>         return registry;
>     }
>
>     @Override
>     protected RouteBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 context.addInterceptStrategy(new Tracer());
>
>                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor
=
> new ActiveMqRedeliveryProcessor();
>
> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>
> activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>
>                 onException(Exception.class)
>                     .handled(true)
>                     .bean(activeMqRedeliveryProcessor)
>                     .end();
>
>                 from("activemq:queue:start").routeId("main")
>                     .to("mock:intercept")
>                     .throwException(new Exception("forced Exception
for
> test!"));
>             }
>         };
>     }
> }
>
> And the ActiveMqRedeliveryProcessor is:
>
> public class ActiveMqRedeliveryProcessor {
>
>     private String redeliveryEndpoint;
>     private String deadLetterEndpoint;
>
>     private long redeliveryDelay = 0l;
>     private double backOffMultiplier = 1;
>     private int maximumRedeliveries = 0;
>
>     public void process(Exchange exchange) throws Exception {
>         JmsMessage message = exchange.getIn(JmsMessage.class);
>
>         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
> Long.class);
>         Long redeliveryCount =
> message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>
>         if (redeliveryCount == null) {
>             redeliveryCount = new Long(0);
>         }
>
>         ProducerTemplate template = new
> DefaultProducerTemplate(exchange.getContext());
>         template.start();
>
>         if (redeliveryCount < maximumRedeliveries) {
>             redeliveryCount = new Long(redeliveryCount + 1);
>             message.setHeader("CamelActiveMqRedeliveryCounter",
> redeliveryCount);
>
>             if (delay == null) {
>                 delay = new Long(redeliveryDelay);
>             } else {
>                 delay = new Long((long) (delay * backOffMultiplier));
>             }
>             message.setHeader("scheduledJobId", null);
>             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
>
>             template.send(redeliveryEndpoint, exchange);
>         } else {
>             template.send(deadLetterEndpoint, exchange);
>         }
>         template.stop();
>     }
>
>     public void setRedeliveryDelay(long redeliveryDelay) {
>         this.redeliveryDelay = redeliveryDelay;
>     }
>
>     public void setBackOffMultiplier(double backOffMultiplier) {
>         this.backOffMultiplier = backOffMultiplier;
>     }
>
>     public void setMaximumRedeliveries(int maximumRedeliveries) {
>         this.maximumRedeliveries = maximumRedeliveries;
>     }
>
>     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>         this.redeliveryEndpoint = redeliveryEndpoint;
>     }
>
>     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>         this.deadLetterEndpoint = deadLetterEndpoint;
>     }
> }
>
> I'm looking for a better integration into Camel for this feature, if other
> people also think this is a common use case (let ActiveMQ handle the
> redelivery instead of having long live inflight messages if the delay is
> more than a few minutes). This integration could looks like:
>
> errorHandler(
>   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>   .maximumRedeliveries(8)
>   .deliveryDelay(60000)
>   .useExponentialBackOff()
>   .backOffMultiplier(2));
> Where "activemq:queue:FOO" is the queue for redelivery and
> "activemq:queue:DLQ" is the dead letter queue.
>
> I'm really interested in your opinions whether this is also useful for other
> people or is this only a special requirement from my site.
>
> Best,
> Christian
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Mime
View raw message