camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Chandler <>
Subject Best Strategy - aggregation
Date Mon, 01 Mar 2010 16:40:35 GMT
Hi there - with Clause help I've been able to get most of the way to
where I need to be.   Right now I'm doing a proof of concept with string
payloads,however in the end the payload will be an object.   Here's what
I'm attempting

I have an incoming message that contains an identifier as well as (N)
things to do against it.   The (N) things can be done in parallel.    So
what we are doing is splitting based on the (N) things.     Here's where
it gets tricky.   
- The first of the (N) things to report success should be sent on while
the rest of them should be aborted.   We should then forward the success
on immediately not waiting for timeouts
- Further, in the event that none of them report success we should
aggregate until all (N) things have reported failure and then forward
that single negative result onward.     
- As the (N) things inherently have timeouts built into them it would be
nice if I didn't have to deal with batchTimeout for the aggregator.

What I'm seeing now with my prototype is that I can successfully spit
and process the split things using a threadPoolExecutor.   I provided
to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy()) 

Assume each of the split items have a built-in timeout on their work
effort of 5 seconds
With that result and without a .batchTimeout(7000L)   I was seeing 2
results from aggregate,   - 1 almost immediately for the successful
result and then a second aggregated message that had all the falures
about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
the .aggregate clause though I got 1 single message that had the success
and the failures all in one.      This is close, however I guess what
I'm asking is how can I control from inside the aggregation the decision
to move forward?    In the splitter I'm already planning on including in
each split object a sharedobject that can be used to abort any of the
sibling split objects so I trhink I have a handle on that.
Basically the reason I need the aggregate mechanism to control the
continuing on part of the process is that if we're going after say
60,000 things then the ability to start work on the successful ones
after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
is significant.    But I still have to account for a totally negative
response in the event none of them are successful.     

I'm presently looking at creating my own AggregationCollection as it
seemed to allow me to figure out size of the aggregated collection and I
can somehow figure out the total number of items split versus how many
have been aggregated to determine I'm done.   (I thought that info was
supposed to be in the header somewhere but it doesn't seem to be there)

Any insights or redirects are appreciated.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message