camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Peter Keller (JIRA)" <>
Subject [jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
Date Wed, 30 Nov 2016 21:01:01 GMT


Peter Keller commented on CAMEL-10272:

Alex, thank you very much for the analysis. 

You are right: if an exception is thrown then {{oldExchange}} may be {{null}}. As this may
be the case in our route, it is mysterious why the exception is thrown in a non-deterministic
way (of course the root cause may be another multi-threading issue not directly correlated
to Camel).

Note, if {{parallelAggregate()}} is used (as mentioned above we don't use this setup), then
{{oldExchange}} may also be null. This can be shown using a debugger with setting a breakpoint
in {{doAggregateInternal}} on the line with {{ExchangeHelper.prepareAggregation}}. Shouldn't
Camel prevent {{oldExchange}} to be {{null}} in this case?

The logging of the (possible) exception caused by the aggregation strategy could help to trace
down the cause. Of course this could be done in the implementation of the aggregation strategy
itself with a try/catch clause without changing the Camel framework. 

Unwinding the exception to the default error handler may break existing routes which is not
desirable. That said, I personally would prefer this possibility.

> Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
> ------------------------------------------------------------------------------------------
>                 Key: CAMEL-10272
>                 URL:
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.16.3, 2.17.3
>         Environment: MacOS 10.11.6, JRE 1.7.0_79
>            Reporter: Peter Keller
> Unfortunately, I am not able to provide a (simple) unit test for comprehending the problem.
Furthermore our (complex) unit tests are not deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. Please find
below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a recipient list
is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
>     .to("direct:pre")
>     .recipientList().method(new MyRecipientListBuilder())
>         .stopOnException()
>         .aggregationStrategy(new MyAggregationStrategy())
>         .parallelProcessing()
>     .end()
>     .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
>     if (oldExchange == null) {
>         // this is the case more than once which is not expected!
>     }
>     // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which contradicts the
contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here the result
object {{AtomicExchange}} is created which is shared during the whole processing.
> If the processing should be done in parallel (as it is the case for our route) then {{MulticastProcessor.doProcessParallel()}}
is invoked. Here one instance of {{AggregateOnTheFlyTask}} is initialized and {{aggregateOnTheFly()}}
is invoked -*asynchronously* via {{run()}}  for *every* target in the recipient list-. via
{{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees that this is
only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is generated,
and if aggregation is not done in parallel (as it is the case in our route), {{}},
{{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result,
Exchange exchange) {
>     if (strategy != null) {
>         // prepare the exchanges for aggregation
>         Exchange oldExchange = result.get();
>         ExchangeHelper.prepareAggregation(oldExchange, exchange);
>         result.set(strategy.aggregate(oldExchange, exchange));
>     }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a race condition
as {{result}} is shared -by every instance of {{AggregateOnTheFlyTask}}- such that {{oldExchange
= result.get()}} may be {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in recipient list
is created, the {{synchronized}} method {{ParallelAggregateTask.doAggregate()}} does not prevent
the race condition!
> Does this sounds reasonably?

This message was sent by Atlassian JIRA

View raw message