Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE457200BF5 for ; Fri, 2 Dec 2016 18:09:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BD12A160B08; Fri, 2 Dec 2016 17:09:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 12599160B2A for ; Fri, 2 Dec 2016 18:08:59 +0100 (CET) Received: (qmail 93835 invoked by uid 500); 2 Dec 2016 17:08:59 -0000 Mailing-List: contact issues-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list issues@camel.apache.org Received: (qmail 93635 invoked by uid 99); 2 Dec 2016 17:08:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2016 17:08:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id CD5232C2A6E for ; Fri, 2 Dec 2016 17:08:58 +0000 (UTC) Date: Fri, 2 Dec 2016 17:08:58 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@camel.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal() MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 02 Dec 2016 17:09:00 -0000 [ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715672#comment-15715672 ] ASF GitHub Bot commented on CAMEL-10272: ---------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/camel/pull/1326 > Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal() > ------------------------------------------------------------------------------------------ > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 > Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for comprehending the problem. Furthermore our (complex) unit tests are not deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here the result object {{AtomicExchange}} is created which is shared during the whole processing. > If the processing should be done in parallel (as it is the case for our route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one instance of {{AggregateOnTheFlyTask}} is initialized and {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for *every* target in the recipient list-. via {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is generated, and if aggregation is not done in parallel (as it is the case in our route), {{ParallelAggregateTask.run()}}, {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a race condition as {{result}} is shared -by every instance of {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in recipient list is created, the {{synchronized}} method {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)