camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Müller <christian.muel...@gmail.com>
Subject Re: Problem with consumerTemplate and transacted (activeMq) routes
Date Wed, 07 Mar 2012 08:11:23 GMT
Good morning Claus,
:o)

I had no luck so far.
I also tested this route with the latest ActiveMQ 5.5-SNAPSHOT and
5.6-SNAPSHOT versions and it still failed.

Best,
Christian

On Wed, Mar 7, 2012 at 9:06 AM, Claus Ibsen <claus.ibsen@gmail.com> wrote:

> Ah a typo, its of course you -> I.
>
> Christian did you have luck finding the ticket(s)?
>
> I remember there was an attachment with a sample unit test that
> demonstrated the issue.
> And it was reported a fair long time ago, like 1-2 years.
> I think it was working in AMQ 5.3 but failed when upgrading to 5.4 or
> it was 5.4 -> 5.5.
>
>
>
> On Tue, Mar 6, 2012 at 3:05 PM, Claus Ibsen <claus.ibsen@gmail.com> wrote:
> > On Tue, Mar 6, 2012 at 2:43 PM, Claus Ibsen <claus.ibsen@gmail.com>
> wrote:
> >> Its not Camel. There is a JIRA ticket in AMQ about this.
> >>
> >
> > Crap you gotta be some sort of imbecile to use  the ASF JIRA. Have
> > been trying for 20 min to find the good damn JIRA ticket about this.
> > It was first reported at Camel and then moved to AMQ as an issue.
> >
> >
> >
> >>
> >> On Tue, Mar 6, 2012 at 2:26 PM, Christian Müller
> >> <christian.mueller@gmail.com> wrote:
> >>> We have this issue by running Apache Camel 2.6.0. I verified that this
> issue
> >>> still exists with the brand new 2.9.1 version.
> >>> We use Java 1.6.0_29 and are able to reproduce it on MacOS and Windows
> 2000.
> >>>
> >>> Please have a look at the route definition below to follow the
> explanation.
> >>> In route-2 we send the received messages into the queue "aggregate" and
> >>> afterwards to an aggregator. The aggregator aggregate messages into
> batches
> >>> of fife. If fife messages are aggregated, the following processor read
> all
> >>> messages from queue "aggregate", aggregate the content and send it to
> a mock
> >>> endpoint. This works fine as long we don't use transacted routes. But
> >>> because we don't want to lose messages (in reality we use a pesistent
> >>> aggregation repository), we switched to transacted routes. With this
> change,
> >>> our processor (which use the Camel consumerTemplate) only receives one
> >>> message in the while loop. We checked the queue and there are four
> other
> >>> messages which are not read. We tried multiple different combinations
> of
> >>> connectionFactory settings, cacheLevelName settings, ... but without
> luck.
> >>>
> >>> Any idea what we did wrong or what Camel does wrong?
> >>>
> >>> The route:
> >>> @Override
> >>> public void configure() throws Exception {
> >>>     from("direct:start").routeId("route-1")
> >>>         .inOnly("activemq:queue:start");
> >>>
> >>>     from("activemqTx:queue:start").routeId("route-2")
> >>>         .transacted("required")
> >>>         .inOnly("activemqTx:queue:aggregate")
> >>>         .aggregate(header("aggregationGroup"), new
> >>> BodyInAggregatingStrategy()).completionSize(5)
> >>>         .process(new Processor() {
> >>>             public void process(Exchange ex) throws Exception {
> >>>
> >>> log.info("################################################");
> >>>                 log.info("Consume from 'activemq:queue:aggregate'
> after
> >>> aggregation completed");
> >>>
> >>> log.info("################################################");
> >>>
> >>>                 StringBuilder builder = new StringBuilder();
> >>>                 Exchange exchange = null;
> >>>
> >>>                 while ((exchange =
> >>> consumerTemplate.receiveNoWait("activemq:queue:aggregate")) != null) {
> >>>
> builder.append(exchange.getIn().getBody(String.class));
> >>>                     consumerTemplate.doneUoW(exchange);
> >>>                 }
> >>>
> >>>
> >>> log.info("################################################");
> >>>                 log.info("no more messages to consume from queue
> >>> 'activemq:queue:aggregate'");
> >>>
> >>> log.info("################################################");
> >>>
> >>>                 ex.getIn().setBody(builder.toString());
> >>>             }
> >>>         })
> >>>         .inOnly("mock:end");
> >>> }
> >>>
> >>> The test:
> >>> public void test() throws InterruptedException {
> >>>     end.expectedBodiesReceived("12345");
> >>>
> >>>     template.sendBodyAndHeader("direct:start", "1", "aggregationGroup",
> >>> "A");
> >>>     template.sendBodyAndHeader("direct:start", "2", "aggregationGroup",
> >>> "A");
> >>>     template.sendBodyAndHeader("direct:start", "3", "aggregationGroup",
> >>> "A");
> >>>     template.sendBodyAndHeader("direct:start", "4", "aggregationGroup",
> >>> "A");
> >>>     template.sendBodyAndHeader("direct:start", "5", "aggregationGroup",
> >>> "A");
> >>>
> >>>     assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
> >>> }
> >>>
> >>> Please find attached my Eclipse project which contains the unit test.
> >>>
> >>> Best,
> >>> Christian
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> FuseSource
> >> Email: cibsen@fusesource.com
> >> Web: http://fusesource.com
> >> Twitter: davsclaus, fusenews
> >> Blog: http://davsclaus.blogspot.com/
> >> Author of Camel in Action: http://www.manning.com/ibsen/
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > FuseSource
> > Email: cibsen@fusesource.com
> > Web: http://fusesource.com
> > Twitter: davsclaus, fusenews
> > Blog: http://davsclaus.blogspot.com/
> > Author of Camel in Action: http://www.manning.com/ibsen/
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message