camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source
Date Tue, 29 Apr 2014 09:11:04 GMT
On Tue, Apr 29, 2014 at 10:59 AM, kraythe . <kraythe@gmail.com> wrote:
> Yeah no problem. I was just hoping someone would have the answer. I keep
> plugging away at it. Probably some transaction issue, I dont know. You can
> answer this perhaps :) , when I send a message to an activemq endpoint with
> InOnly exchange pattern, will that message be subject to the transation?
> I.e. if the transaction fails will it get popped off AMQ?
>

Yes if you do that from a camel route using the same jms component /
endpoint that started the transaction (eg same jms session).

Then only when the TX commit, the message on the queue will be commit
and "visible" for consumers.
Its like figure 9.6 in Camel in Action book.


> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <claus.ibsen@gmail.com> wrote:
>
>> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kraythe@gmail.com> wrote:
>> > No one has any idea on this? It would be really great if I could find the
>> > formula to get this to work.
>> >
>>
>> I dont think people always have the time to help, and especially when
>> its more complicated with transactions and a lot of Camel route code.
>>
>> If you want to get priority help then there is some companies that offer
>> that
>> http://camel.apache.org/commercial-camel-offerings.html
>>
>> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >
>> >
>> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kraythe@gmail.com> wrote:
>> >
>> >> So I think there is a problem with the way rollback is implemented in
>> >> relation to Camel. As far as I can tell there is no way to get the
>> >> following working.
>> >>
>> >> package com.ea.wwce.camel.test.utilities;
>> >>
>> >> import com.ea.wwce.camel.utilities.data.RecordList;
>> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
>> >> import org.apache.camel.ExchangePattern;
>> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
>> >> import org.apache.camel.builder.RouteBuilder;
>> >> import org.apache.camel.component.mock.MockEndpoint;
>> >> import org.testng.annotations.Test;
>> >> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
>> >> import static
>> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
>> >> import static
>> >>
>> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
>> >> import static org.apache.camel.ExchangePattern.InOnly;
>> >>
>> >> /** This test suite validates the transaction configuration in the test
>> >> suite. */
>> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
>> >>   private static final String QUEUE_DEAD = "dead";
>> >>   private static final String QUEUE_INBOX = "inbox";
>> >>   private static final String QUEUE_OUTBOX = "outbox";
>> >>   private static final String ROUTE_ID_FEED = "Feed";
>> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
>> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
>> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
>> >>   private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";
>> >>
>> >>   private static final String MOCK_END = "mock:end";
>> >>   private static final String MOCK_BEFORE_TO_QUEUE =
>> >> "mock:before_to_queue";
>> >>   private static final String MOCK_AFTER_TO_QUEUE =
>> "mock:after_to_queue";
>> >>
>> >>   /** Mock endpoints. */
>> >>   private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
>> >> mockAfterToQueue;
>> >>
>> >>   /** Helper to initialize mocks in the test. */
>> >>   private void initMocks() {
>> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
>> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
>> >>     mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
>> >>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
>> >>     mockOutbox =
>> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
>> >>   }
>> >>
>> >>   @Override
>> >>   protected RouteBuilder createRouteBuilder() {
>> >>     System.out.println("createRouteBuilder");
>> >>     return new RouteBuilder(this.context) {
>> >>       @Override
>> >>       public void configure() {
>> >>         getContext().setTracing(true);
>> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>
>> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
>> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>> >>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>> >>
>> >> .onException(RuntimeException.class).handled(true).useOriginalMessage()
>> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
>> >>             .unmarshal(dfCaseRecord)
>> >>             .to(MOCK_BEFORE_TO_QUEUE)
>> >>             .marshal(dfCaseRecord)
>> >>             .to(endpointAMQ(QUEUE_OUTBOX))
>> >>             .unmarshal(dfCaseRecord)
>> >>             .to(MOCK_AFTER_TO_QUEUE);
>> >>       }
>> >>     };
>> >>   }
>> >>
>> >>   /** Advice the route, mocking ActiveMQ endpoints. */
>> >>   protected void adviceRoute() throws Exception {
>> >>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
>> >>         this.context, new AdviceWithRouteBuilder() {
>> >>           @Override
>> >>           public void configure() throws Exception {
>> >>             mockEndpoints("activemq:queue:*");
>> >>           }
>> >>         }
>> >>     );
>> >>   }
>> >>
>> >>   @Test
>> >>   public void testNormalRouting() throws Exception {
>> >>     adviceRoute();
>> >>     startCamelContext();
>> >>     initMocks();
>> >>     final RecordList cases = casesA();
>> >>
>> >>     mockEnd.expectedBodiesReceived(cases);
>> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>
>> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> cases));
>> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>
>> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>
>> >>     mockBeforeToQueue.assertIsSatisfied();
>> >>     mockAfterToQueue.assertIsSatisfied();
>> >>     mockEnd.assertIsSatisfied();
>> >>     mockDead.assertIsSatisfied();
>> >>     mockOutbox.assertIsSatisfied();
>> >>   }
>> >>
>> >>   @Test
>> >>   public void testRollbackBeforeEnqueue() throws Exception {
>> >>     adviceRoute();
>> >>     startCamelContext();
>> >>     initMocks();
>> >>     final RecordList cases = casesA();
>> >>
>> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> cases.get(2));
>> >>     mockDead.expectedBodiesReceived(cases.get(0));
>> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >>
>> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> cases.get(1), cases.get(2)));
>> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> >> cases.get(2));
>> >>
>> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>
>> >>     mockBeforeToQueue.assertIsSatisfied();
>> >>     mockAfterToQueue.assertIsSatisfied();
>> >>     mockEnd.assertIsSatisfied();
>> >>     mockDead.assertIsSatisfied();
>> >>     mockOutbox.assertIsSatisfied();
>> >>   }
>> >>
>> >>   @Test
>> >>   public void testRollbackAfterEnqueue() throws Exception {
>> >>     adviceRoute();
>> >>     startCamelContext();
>> >>     initMocks();
>> >>     final RecordList cases = casesA();
>> >>
>> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> cases.get(2));
>> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
>> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>
>> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> cases));
>> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >>
>> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>
>> >>     mockBeforeToQueue.assertIsSatisfied();
>> >>     mockAfterToQueue.assertIsSatisfied();
>> >>     mockDead.assertIsSatisfied();
>> >>     mockOutbox.assertIsSatisfied();
>> >>     mockEnd.assertIsSatisfied();
>> >>   }
>> >> }
>> >>
>> >> I have tried dozens of combinations in the onException clause and
>> nothing
>> >> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
>> >> back the dead letter channel as well. Making the handled(false) cause
>> AMQ
>> >> to resubmit the transaction and rolls back the dead letter channel. I
>> have
>> >> tried a dozen combinations so if anyone has one that works I would be
>> >> grateful.
>> >>
>> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >>
>> >>
>> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kraythe@gmail.com> wrote:
>> >>
>> >>> So, in the ongoing perfect transaction configuration we have an
>> >>> interesting use case: Consider the following route in a test:
>> >>>
>> >>>   @Override
>> >>>   protected RouteBuilder createRouteBuilder() {
>> >>>     System.out.println("createRouteBuilder");
>> >>>     return new RouteBuilder(this.context) {
>> >>>       @Override
>> >>>       public void configure() {
>> >>>         getContext().setTracing(true);
>> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>
>> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
>> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>> >>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>> >>>
>> >>>
>> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>             .unmarshal(dfCaseRecord)
>> >>>             .to(MOCK_BEFORE_TO_QUEUE)
>> >>>             .marshal(dfCaseRecord)
>> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
>> >>>             .unmarshal(dfCaseRecord)
>> >>>             .to(MOCK_AFTER_TO_QUEUE);
>> >>>       }
>> >>>     };
>> >>>   }
>> >>>
>> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
>> >>> up the transaction policy by name as it is just a string key for our
>> JNDI
>> >>> registry. Our test case looks like the following.
>> >>>
>> >>>   @Test
>> >>>   public void testRollbackAfterEnqueue() throws Exception {
>> >>>     adviceRoute();
>> >>>     startCamelContext();
>> >>>     initMocks();
>> >>>     final RecordList cases = casesA();
>> >>>
>> >>>     mockEnd.expectedMessageCount(2);
>> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
>> >>>     mockDead.expectedMessageCount(1);
>> >>>     mockDead.expectedBodiesReceived(cases.get(0));
>> >>>     mockBeforeToQueue.expectedMessageCount(3);
>> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>>     mockOutbox.expectedMessageCount(3);
>> >>>
>> >>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >>> cases));
>> >>>     mockAfterToQueue.expectedMessageCount(3);
>> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >>>
>> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>>
>> >>>     mockBeforeToQueue.assertIsSatisfied();
>> >>>     mockAfterToQueue.assertIsSatisfied();
>> >>>     mockEnd.assertIsSatisfied();
>> >>>     mockDead.assertIsSatisfied();
>> >>>     mockOutbox.assertIsSatisfied();
>> >>>   }
>> >>>
>> >>> In this route the goal is that if any exceptions are thrown even after
>> >>> the message is enqueued, it should be rolled back, the message that
>> >>> excepted should go to the DLQ and the messages in the outbox should
be
>> >>> rolled back but the message from the inbox should not be put back on
>> the
>> >>> queue. This is proving to be a bit of a juggling act.
>> >>>
>> >>> I tried putting markRollBackOnly() in the exception handler after the
>> >>> to(dead) but that rolled back the dead letter queue and outbox and then
>> >>> redelivered the inbox message. Removing the markRollbackOnly() means
>> >>> that the message arrives at dead and is off the inbox but it doesn't
>> get
>> >>> removed from the outbox.
>> >>>
>> >>> So I am looking for the right combination of calls to accomplish what
I
>> >>> want to do. Any suggestions?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >>>
>> >>
>> >>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> hawtio: http://hawt.io/
>> fabric8: http://fabric8.io/
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Mime
View raw message