camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <>
Subject Re: Aggregator lock
Date Wed, 18 Sep 2013 10:54:39 GMT

See the parallelProcessing / executorService option on the aggregator

On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <> wrote:
> Hi,
> I'm seeing some surprising behaviour with my camel route, and was hoping
> someone in this group could help, as my trawl through the docs and Camel In
> Action book have not found the answers I'm looking for. Apologies if this
> question has been clearly answered elsewhere :-/
> I have a route that looks a little like the following:
>     from("seda:foo?concurrentConsumers=2")
>       .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
>         .log("Sending out ${body} after a short pause...")
>         .delay(3000) // simulate a lengthy process
>         .log("Sending out ${body} imminently!")
>         .to(...) // other downstream processing
> Note that I'm using a SEDA with two *concurrent* consumers. I expected that
> once a SEDA consumer thread has picked up a message that completes an
> aggregation, that downstream processing will continue on that consumer
> thread, whilst other such downstream processing for another 'completed
> aggregation' message may be happening in parallel on the other SEDA
> consumer thread.
> What I'm finding instead is that whilst all of the work downstream of
> aggregate() does occur across the two consumer threads, it is serialised;
> no two threads execute the processors at the same time. This becomes quite
> noticeable if this downstream work is lengthy. I've uploaded a sample to
>, which you can run with mvn test
> -Dtest=AggregateLock. It started from a sample from the CIA book.
> For example, you can see the whilst the second "Sending... after a short
> pause" does occur on a separate thread (#2), it does not start until after
> thread #1 has completed, despite the 3s delay():
> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!
> Is this behaviour expected? I found it _very_ surprising. Did I miss
> something in the docs that describes this behaviour? If the behaviour is
> expected, I am happy to try adding some info to the documentation if
> someone can explain the intent behind it.
> I'm not terribly familiar with the code, but I've had a dig around, and it
> looks like the reason for this behaviour is due to the following code
> inside the process() method of
> org.apache.camel.processor.aggregate.AggregateProcessor:
>             lock.lock();
>             try {
>                 doAggregation(key, copy);
>             } finally {
>                 lock.unlock();
>             }
> The doAggregation() method performs both the aggregation (i.e., adding the
> new exchange to the repository, checking if the completion criteria have
> been met etc) _and_, if complete, submits the aggregated message to the
> ExecutorService for downstream processing. However, since the default
> executorService is the SynchronousExecutorService, all downstream
> processing occurs synchronously with submission, and consequently, _within_
> the lock above.
> Whilst I can see obvious reasons that may make it necessary to perform the
> actual aggregation inside a lock, I do find it quite surprising that the
> downstream processing by default also occurs inside this lock. Are there
> any other processors known to behave in this way, i.e., by taking a lock
> around all downstream processing?
> I could potentially work around this issue by dispensing with the SEDA
> concurrentConsumers and using aggregate().parallelProcessing() instead,
> with a suitable executorService() specified, but this introduces a number
> of complications, e.g.:
> - if I repeatedly split() and re-aggregate() (by different criteria), then
> _every time_ I aggregate I have to add
> parallelProcessing()/executorService(); this is verbose and error prone.
> - with repeated aggregates in a route, I need dedicated threads/pools per
> aggregate(), which means way more threads than I really want/need.
> - regardless, I don't get the predictable and simple behaviour I expected
> of 'pick up job from SEDA, aggregate, synchronously process downstream
> jobs' that I'd expected.
> Another possible workaround might be the optimistic locking, but I haven't
> had the opportunity to study it yet. It seems unrelated - I think my
> problem is with the very coarse granularity of the pessimistic lock, not
> with whether it's optimistic. Plus, I don't really want my messages to ever
> fail with a 'too many attempts to acquire the optimistic lock' exception,
> and I might have quite high contention).
> Many thanks in advance for your help/comments!
> Baris.

Claus Ibsen
Red Hat, Inc.
Twitter: davsclaus
Author of Camel in Action:

View raw message