camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Björn Bength <>
Subject Re: Ruthgard check out completionFromBatchConsumer and groupExchanges options on the Aggregator
Date Sun, 20 Feb 2011 22:01:33 GMT
Well, it's me who tried to help Mr Ruthgard, but we ended up with only
very ugly workarounds in our little session.
He faced a problem we too have been seeing with aggregation, but not
have had the time to fully investigate
so Im basically curious as how to fix the issue.
If I may paste code into this mail I can give an outline of the problem:

       new GroupedExchangeAggregationStrategy()).completionTimeout(1000).
        process(new Processor() {
    	   public void process(Exchange exchange) throws Exception {
    	      Exchange oldExchange =
        //setHeader("CamelFileName", simple("[builds a custom file
name based on correlation_id from custom component]").
  // delay(5000).

(we "end()" the aggregate to get same effect as the xml-config of the
same route)
It basically polls files, group them by parent folder in a grouped
exchange property.
and then sends that exchange off to a custom component who's endpoint
packs those files
into a special mime format, sending them over the network. (reading
the file content as it streams it over the wire)
If that was not enough we need to move the sent files to a
directory/filename based on a correlation id taken from
 the grouped send from the custom component. all grouped files ends up together.

So the Problem:
File endpoint is blazingly fast, giving the files to the aggregator and
then each FILE exchange falls through to the end and is DONE.
File removed. Custom component cannot read a removed file
when it's gonna send it. (beacuse he's not that fast)

Ruthgard has a streamCache though, and pre-reads file before aggregating, thus
small files (64kB) fits in memory and can be sent accordingly.
Bigger files ends up on a temp file, which also seems to be removed
when file exchange is done.
Requirements for many, big, concurrent files makes it less suitable to
simple raise that 64kB limit too.

The easiest way to "fix" this is to add a delay to the file exchange
but one might also put a noop=true on the file endpoint and manually move files
to it's final archive directory using a custom processor.
Error handling in this scenario is also going to be fun.

Is there a better way, perhaps delaying a file exchange's "done" until
the aggregated exchange containing
that exchange is finished?

In my reading of the sources of aggregate processor I discovered a
boolean flag named "parallelProcessing"
which seems to be spot on, but it's not even used once in the sources
I looked at.


On Sat, Feb 19, 2011 at 8:13 AM, Claus Ibsen <> wrote:
> I was just looking from the logs:
> Maybe those options could help him with his, i wanna group together
> all the files I consumed from the file consumer in the poll.
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email:
> Web:
> Twitter: davsclaus
> Blog:
> Author of Camel in Action:

View raw message