Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A3197200BD0 for ; Wed, 30 Nov 2016 22:01:04 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A0B6F160B06; Wed, 30 Nov 2016 21:01:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BCC87160B19 for ; Wed, 30 Nov 2016 22:01:03 +0100 (CET) Received: (qmail 11735 invoked by uid 500); 30 Nov 2016 21:01:02 -0000 Mailing-List: contact issues-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list issues@camel.apache.org Received: (qmail 11300 invoked by uid 99); 30 Nov 2016 21:01:02 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Nov 2016 21:01:01 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 77DF42C2A69 for ; Wed, 30 Nov 2016 21:01:01 +0000 (UTC) Date: Wed, 30 Nov 2016 21:01:01 +0000 (UTC) From: "Peter Keller (JIRA)" To: issues@camel.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal() MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 30 Nov 2016 21:01:04 -0000 [ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709764#comment-15709764 ] Peter Keller commented on CAMEL-10272: -------------------------------------- Alex, thank you very much for the analysis. You are right: if an exception is thrown then {{oldExchange}} may be {{null}}. As this may be the case in our route, it is mysterious why the exception is thrown in a non-deterministic way (of course the root cause may be another multi-threading issue not directly correlated to Camel). Note, if {{parallelAggregate()}} is used (as mentioned above we don't use this setup), then {{oldExchange}} may also be null. This can be shown using a debugger with setting a breakpoint in {{doAggregateInternal}} on the line with {{ExchangeHelper.prepareAggregation}}. Shouldn't Camel prevent {{oldExchange}} to be {{null}} in this case? The logging of the (possible) exception caused by the aggregation strategy could help to trace down the cause. Of course this could be done in the implementation of the aggregation strategy itself with a try/catch clause without changing the Camel framework. Unwinding the exception to the default error handler may break existing routes which is not desirable. That said, I personally would prefer this possibility. > Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal() > ------------------------------------------------------------------------------------------ > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 > Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for comprehending the problem. Furthermore our (complex) unit tests are not deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here the result object {{AtomicExchange}} is created which is shared during the whole processing. > If the processing should be done in parallel (as it is the case for our route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one instance of {{AggregateOnTheFlyTask}} is initialized and {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for *every* target in the recipient list-. via {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is generated, and if aggregation is not done in parallel (as it is the case in our route), {{ParallelAggregateTask.run()}}, {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a race condition as {{result}} is shared -by every instance of {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in recipient list is created, the {{synchronized}} method {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)