camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Müller <christian.muel...@gmail.com>
Subject Re: Missing feature to handle errors in a route which reads from an activemq destination
Date Sun, 31 Jul 2011 17:47:55 GMT
For clarification:
We have multiple services/bundles running inside one ServiceMix instance and
sharing the same ActiveMQConnectionFactory which is exported as OSGI service
by the activemq-broker.xml configuration.
We could use a separate ActiveMQConnectionFactory for each service/bundle,
but this looks not as the right way for me. May as a work around until we
have a better solution...

Best,
Christian

On Sun, Jul 31, 2011 at 7:42 PM, Christian Müller <
christian.mueller@gmail.com> wrote:

> Hello Claus!
>
> As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot
> use it in Camel in the way I would like it. To only have one redelivery
> policy in place is not sufficient for us (if you configure the redelivery
> policy in your ActiveMQConnectionFactory). We have multiple applications
> sharing the same broker with different requirements (online, batch, ...).
>
> I know it's also possible to configure it on the ActiveMQConnection [1],
> but as I know we don't have this possibility in Camel right now. Correct me
> if I'm wrong. May be this is a better solution for my needs, when we have to
> possibilities to configure the redelivery policy as an option in the
> ActiveMqComponent:
>
> from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")...
>
> or
>
>
> from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")...
>
> But for this, I think I have to open a JIRA for ActiveMQ, right?
>
> [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html
>
> Best,
> Christian
>
>
> On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <claus.ibsen@gmail.com>wrote:
>
>> Frankly I thing the redelivery logic should be part of Apache ActiveMQ.
>>
>> Your solution is brittle in the fact that you consume the message, and
>> then send it back to the broker. What happens if you cannot send the
>> message back to the broker?
>> It would be better if the broker handled all this out of the box.
>>
>> ActiveMQ already has redelivery policy where you can configure various
>> delays options such as exponential backoff etc.
>>
>> And there is a JIRA ticket for ActiveMQ to support asynchronous
>> scheduled redeliveries.
>> https://issues.apache.org/jira/browse/AMQ-1853
>> Which has the potential for consumers to pickup the next message, and
>> not block waiting to issue the redelivery.
>>
>>
>>
>> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller
>> <christian.mueller@gmail.com> wrote:
>> > Hello!
>> >
>> > I didn't get so much responses as I hoped. May it was not clear what I
>> try
>> > to do or what I miss at present in Camel.
>> > Therefore a build a small unit test (and attached it) to demonstrate
>> what
>> > I'm looking for (for convenience I also paste the code into this mail at
>> the
>> > end).
>> >
>> > I build my own (simple) redelivery processor which I use to re-enqueue
>> > messages into ActiveMQ after the default Camel error handler was kicked
>> in
>> > and catched the exception. I use the same logic as Camel does (but much
>> more
>> > simpler for this test) to trace the redelivery delay and count. But
>> instead
>> > to wait for the retry, I put the message message into ActiveMQ for
>> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by
>> > ActiveMQ to delay the delivery of the message:
>> >
>> > public class RedeliveryTest extends CamelTestSupport {
>> >
>> >     @EndpointInject(uri = "mock:intercept")
>> >     private MockEndpoint intercept;
>> >
>> >     @EndpointInject(uri = "mock:dlq")
>> >     private MockEndpoint dlq;
>> >
>> >     @Before
>> >     public void setUp() throws Exception {
>> >         disableJMX();
>> >
>> >         super.setUp();
>> >     }
>> >
>> >     @Test
>> >     public void redelieryTest() throws Exception {
>> >         context.addRoutes(new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 from("activemq:queue:dlq")
>> >                     .to("mock:dlq");
>> >             }
>> >         });
>> >
>> >         intercept.expectedMessageCount(6); // 1 + 5 redeliveries
>> >         dlq.expectedMessageCount(1);
>> >
>> >         template.sendBody("activemq:queue:start", "Hello Camel!");
>> >
>> >         intercept.assertIsSatisfied();
>> >         dlq.assertIsSatisfied();
>> >
>> >         Exchange exchange = dlq.getExchanges().get(0);
>> >         assertEquals(new Long(5),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter"));
>> >         assertEquals(new Long(3200l),
>> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay"));
>> >     }
>> >
>> >     @Override
>> >     protected JndiRegistry createRegistry() throws Exception {
>> >         JndiRegistry registry = super.createRegistry();
>> >
>> >         ActiveMQComponent activemq =
>> >
>> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false");
>> >         registry.bind("activemq", activemq);
>> >
>> >         return registry;
>> >     }
>> >
>> >     @Override
>> >     protected RouteBuilder createRouteBuilder() throws Exception {
>> >         return new RouteBuilder() {
>> >             @Override
>> >             public void configure() throws Exception {
>> >                 context.addInterceptStrategy(new Tracer());
>> >
>> >                 ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor
>> =
>> > new ActiveMqRedeliveryProcessor();
>> >
>> >
>> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start");
>> >
>> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq");
>> >                 activeMqRedeliveryProcessor.setRedeliveryDelay(200);
>> >                 activeMqRedeliveryProcessor.setBackOffMultiplier(2.0);
>> >                 activeMqRedeliveryProcessor.setMaximumRedeliveries(5);
>> >
>> >                 onException(Exception.class)
>> >                     .handled(true)
>> >                     .bean(activeMqRedeliveryProcessor)
>> >                     .end();
>> >
>> >                 from("activemq:queue:start").routeId("main")
>> >                     .to("mock:intercept")
>> >                     .throwException(new Exception("forced Exception for
>> > test!"));
>> >             }
>> >         };
>> >     }
>> > }
>> >
>> > And the ActiveMqRedeliveryProcessor is:
>> >
>> > public class ActiveMqRedeliveryProcessor {
>> >
>> >     private String redeliveryEndpoint;
>> >     private String deadLetterEndpoint;
>> >
>> >     private long redeliveryDelay = 0l;
>> >     private double backOffMultiplier = 1;
>> >     private int maximumRedeliveries = 0;
>> >
>> >     public void process(Exchange exchange) throws Exception {
>> >         JmsMessage message = exchange.getIn(JmsMessage.class);
>> >
>> >         Long delay = message.getHeader("CamelActiveMqRedeliveryDelay",
>> > Long.class);
>> >         Long redeliveryCount =
>> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class);
>> >
>> >         if (redeliveryCount == null) {
>> >             redeliveryCount = new Long(0);
>> >         }
>> >
>> >         ProducerTemplate template = new
>> > DefaultProducerTemplate(exchange.getContext());
>> >         template.start();
>> >
>> >         if (redeliveryCount < maximumRedeliveries) {
>> >             redeliveryCount = new Long(redeliveryCount + 1);
>> >             message.setHeader("CamelActiveMqRedeliveryCounter",
>> > redeliveryCount);
>> >
>> >             if (delay == null) {
>> >                 delay = new Long(redeliveryDelay);
>> >             } else {
>> >                 delay = new Long((long) (delay * backOffMultiplier));
>> >             }
>> >             message.setHeader("scheduledJobId", null);
>> >             message.setHeader("CamelActiveMqRedeliveryDelay", delay);
>> >             message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY,
>> delay);
>> >
>> >             template.send(redeliveryEndpoint, exchange);
>> >         } else {
>> >             template.send(deadLetterEndpoint, exchange);
>> >         }
>> >         template.stop();
>> >     }
>> >
>> >     public void setRedeliveryDelay(long redeliveryDelay) {
>> >         this.redeliveryDelay = redeliveryDelay;
>> >     }
>> >
>> >     public void setBackOffMultiplier(double backOffMultiplier) {
>> >         this.backOffMultiplier = backOffMultiplier;
>> >     }
>> >
>> >     public void setMaximumRedeliveries(int maximumRedeliveries) {
>> >         this.maximumRedeliveries = maximumRedeliveries;
>> >     }
>> >
>> >     public void setRedeliveryEndpoint(String redeliveryEndpoint) {
>> >         this.redeliveryEndpoint = redeliveryEndpoint;
>> >     }
>> >
>> >     public void setDeadLetterEndpoint(String deadLetterEndpoint) {
>> >         this.deadLetterEndpoint = deadLetterEndpoint;
>> >     }
>> > }
>> >
>> > I'm looking for a better integration into Camel for this feature, if
>> other
>> > people also think this is a common use case (let ActiveMQ handle the
>> > redelivery instead of having long live inflight messages if the delay is
>> > more than a few minutes). This integration could looks like:
>> >
>> > errorHandler(
>> >   activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ")
>> >   .maximumRedeliveries(8)
>> >   .deliveryDelay(60000)
>> >   .useExponentialBackOff()
>> >   .backOffMultiplier(2));
>> > Where "activemq:queue:FOO" is the queue for redelivery and
>> > "activemq:queue:DLQ" is the dead letter queue.
>> >
>> > I'm really interested in your opinions whether this is also useful for
>> other
>> > people or is this only a special requirement from my site.
>> >
>> > Best,
>> > Christian
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message