camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tom Fornoville <tom.fornovi...@roots.be>
Subject Re: Grouping messages for batch processing
Date Tue, 22 Oct 2013 14:48:30 GMT
thanks Claus and Erwin for the answers, I'm starting to understand the
patterns a little bit better.

I am however still not sure how to handle the use-case with only 1
aggregator.
As Claus suggested I'm now sending an extra message after all the
organisations are split.

The routes now look something like this:

<route>
<from uri="file:src/data?noop=true" />
<setHeader headerName="transactionGUID">
<simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
</setHeader>
<split>
<tokenize token="organisation" xml="true" />
<to uri="activemq:organisation.queue" />
</split>
<setBody>
<simple>LAST_LINE</simple>
</setBody>
<to uri="activemq:organisation.queue" />
</route>

<route id="process-organisations">
<from uri="activemq:organisation.queue" />
<aggregate strategyRef="organisationAggregator" completionSize="50"
completionTimeout="5000">
<correlationExpression>
<header>transactionGUID</header>
</correlationExpression>
<bean ref="organisationServiceActivator" method="updateOrganisations" />
</aggregate>
<choice>
<when>
<simple>${body} == 'LAST_LINE'</simple>
<bean ref="transactionServiceActivator" method="updateTransaction" />
</when>
</choice>
</route>

The only problem now is that my
transactionServiceActivator.updateTransaction is called before the last
batch of organisations is updated when the organisations are not an exact
multiple of 50.
For example starting with a file that has 120 organisations I get the
following:
 * update organisations 1-50
 * update organisations  51-100
 * update transaction
 * update organisations  101-120

Tom Fornoville
Senior Developer
m: +32 478 65 86 51
www.roots.be


On Tue, Oct 22, 2013 at 3:01 PM, Erwin Etchart <erwin.etchart@gmail.com>wrote:

> Just for help remember (is a mistake that i made) that every aggregate
> needs its own repository(i loose a few hours with this).
>
> Regards.
>
>
> 2013/10/22 Tom.Fornoville <tom.fornoville@roots.be>
>
> > Hi everyone,
> >
> > I have the following use-case:
> > * messages arrive on a queue, 1 message for each organisation be
> > created/updated, messages that belong to the same transaction (= our own
> > term, NOT a JPA transaction or something like that) all have the same
> > transactionGUID
> >
> > * we need to group organisations together in batches of 50 to process
> them,
> > the last batch can be smaller
> >
> > * during processing extra entities can be added to the queue: updates for
> > an
> > enterprise can trigger updates for establishments under that enterprise
> >
> > * after processing by the bean we need to update the transaction entity
> > with
> > statistics (# updated, # created etc...)
> >
> > I was thinking of doing this with 2 aggregators like this:
> >
> > <route id="process-organisations">
> >         <from uri="activemq:organisation.update.queue" />
> >         <aggregate strategyRef="organisationAggregator"
> completionSize="50"
> > completionTimeout="5000">
> >                 <correlationExpression>
> >                         <header>transactionGUID</header>
> >                 </correlationExpression>
> >                 <bean ref="organisationServiceActivator"
> > method="updateOrganisations" />
> >                 <to uri="activemq:organisation.processed.queue" />
> >         </aggregate>
> > </route>
> >
> > <route id="aggregate-organisation-results">
> >         <from uri="activemq:organisation.processed.queue" />
> >         <aggregate strategyRef="?">
> >                 <correlationExpression>
> >                         <header>?</header>
> >                 </correlationExpression>
> >                 <completionPredicate>
> >                         <simple>?</simple>
> >                 </completionPredicate>
> >                 <bean ref="transactionServiceActivator"
> > method="updateTransaction" />
> >         </aggregate>
> > </route>
> >
> > The questions I have:
> > * are 2 routes with aggregators the best way to handle this use case?
> > * will the first aggregate keep working or will it stop after the first
> > batch of 50?
> > * what should be the correlation and completion paramters for the second
> > aggregation?
> >
> > Thanks in advance,
> > Tom
> >
> >
> >
> >
> > --
> > View this message in context:
> >
> http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message