camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Problem with consumerTemplate and transacted (activeMq) routes
Date Wed, 07 Mar 2012 08:06:00 GMT
Ah a typo, its of course you -> I.

Christian did you have luck finding the ticket(s)?

I remember there was an attachment with a sample unit test that
demonstrated the issue.
And it was reported a fair long time ago, like 1-2 years.
I think it was working in AMQ 5.3 but failed when upgrading to 5.4 or
it was 5.4 -> 5.5.



On Tue, Mar 6, 2012 at 3:05 PM, Claus Ibsen <claus.ibsen@gmail.com> wrote:
> On Tue, Mar 6, 2012 at 2:43 PM, Claus Ibsen <claus.ibsen@gmail.com> wrote:
>> Its not Camel. There is a JIRA ticket in AMQ about this.
>>
>
> Crap you gotta be some sort of imbecile to use  the ASF JIRA. Have
> been trying for 20 min to find the good damn JIRA ticket about this.
> It was first reported at Camel and then moved to AMQ as an issue.
>
>
>
>>
>> On Tue, Mar 6, 2012 at 2:26 PM, Christian Müller
>> <christian.mueller@gmail.com> wrote:
>>> We have this issue by running Apache Camel 2.6.0. I verified that this issue
>>> still exists with the brand new 2.9.1 version.
>>> We use Java 1.6.0_29 and are able to reproduce it on MacOS and Windows 2000.
>>>
>>> Please have a look at the route definition below to follow the explanation.
>>> In route-2 we send the received messages into the queue "aggregate" and
>>> afterwards to an aggregator. The aggregator aggregate messages into batches
>>> of fife. If fife messages are aggregated, the following processor read all
>>> messages from queue "aggregate", aggregate the content and send it to a mock
>>> endpoint. This works fine as long we don't use transacted routes. But
>>> because we don't want to lose messages (in reality we use a pesistent
>>> aggregation repository), we switched to transacted routes. With this change,
>>> our processor (which use the Camel consumerTemplate) only receives one
>>> message in the while loop. We checked the queue and there are four other
>>> messages which are not read. We tried multiple different combinations of
>>> connectionFactory settings, cacheLevelName settings, ... but without luck.
>>>
>>> Any idea what we did wrong or what Camel does wrong?
>>>
>>> The route:
>>> @Override
>>> public void configure() throws Exception {
>>>     from("direct:start").routeId("route-1")
>>>         .inOnly("activemq:queue:start");
>>>
>>>     from("activemqTx:queue:start").routeId("route-2")
>>>         .transacted("required")
>>>         .inOnly("activemqTx:queue:aggregate")
>>>         .aggregate(header("aggregationGroup"), new
>>> BodyInAggregatingStrategy()).completionSize(5)
>>>         .process(new Processor() {
>>>             public void process(Exchange ex) throws Exception {
>>>
>>> log.info("################################################");
>>>                 log.info("Consume from 'activemq:queue:aggregate'
after
>>> aggregation completed");
>>>
>>> log.info("################################################");
>>>
>>>                 StringBuilder builder = new StringBuilder();
>>>                 Exchange exchange = null;
>>>
>>>                 while ((exchange =
>>> consumerTemplate.receiveNoWait("activemq:queue:aggregate")) != null) {
>>>                     builder.append(exchange.getIn().getBody(String.class));
>>>                     consumerTemplate.doneUoW(exchange);
>>>                 }
>>>
>>>
>>> log.info("################################################");
>>>                 log.info("no more messages to consume from queue
>>> 'activemq:queue:aggregate'");
>>>
>>> log.info("################################################");
>>>
>>>                 ex.getIn().setBody(builder.toString());
>>>             }
>>>         })
>>>         .inOnly("mock:end");
>>> }
>>>
>>> The test:
>>> public void test() throws InterruptedException {
>>>     end.expectedBodiesReceived("12345");
>>>
>>>     template.sendBodyAndHeader("direct:start", "1", "aggregationGroup",
>>> "A");
>>>     template.sendBodyAndHeader("direct:start", "2", "aggregationGroup",
>>> "A");
>>>     template.sendBodyAndHeader("direct:start", "3", "aggregationGroup",
>>> "A");
>>>     template.sendBodyAndHeader("direct:start", "4", "aggregationGroup",
>>> "A");
>>>     template.sendBodyAndHeader("direct:start", "5", "aggregationGroup",
>>> "A");
>>>
>>>     assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
>>> }
>>>
>>> Please find attached my Eclipse project which contains the unit test.
>>>
>>> Best,
>>> Christian
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Mime
View raw message