Return-Path: X-Original-To: apmail-camel-issues-archive@minotaur.apache.org Delivered-To: apmail-camel-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EF16C10B30 for ; Fri, 20 Sep 2013 06:17:59 +0000 (UTC) Received: (qmail 74518 invoked by uid 500); 20 Sep 2013 06:17:59 -0000 Delivered-To: apmail-camel-issues-archive@camel.apache.org Received: (qmail 74087 invoked by uid 500); 20 Sep 2013 06:17:52 -0000 Mailing-List: contact issues-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list issues@camel.apache.org Received: (qmail 74072 invoked by uid 99); 20 Sep 2013 06:17:51 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 06:17:51 +0000 Date: Fri, 20 Sep 2013 06:17:51 +0000 (UTC) From: "Joshua Groom (JIRA)" To: issues@camel.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (CAMEL-6771) ConcurrentModificationException thrown from inside camel splitter MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Joshua Groom created CAMEL-6771: ----------------------------------- Summary: ConcurrentModificationException thrown from inside camel splitter Key: CAMEL-6771 URL: https://issues.apache.org/jira/browse/CAMEL-6771 Project: Camel Issue Type: Bug Components: camel-core Affects Versions: 2.11.1 Environment: Amazon Linux, oracle jdk 1.7 Reporter: Joshua Groom We use camel 2.11.1 running on the oracle 1.7 jvm for linux. I have a route that looks like this. It reads in files and puts them on a seda queue with 8 concurrent consumers. - The SpatialInterpolationPojo reads each file is read and split into two messages X and Y. - The MyAggregator uses X and Y together and outputs a combined message A.B - The MySplitterPojo splits A.B into two messages A and B from("file://somefile") .to("seda:filteraccept?concurrentConsumers=8"); from("seda:filteraccept?concurrentConsumers=8") .split() .method(new SpatialInterpolationPojo(), "split") .to("direct:wind-aggregator"); from("direct:wind-aggregator") .aggregate(packageCorrelationId(), new MyAggregator()) .completionPredicate(header(FIELD_AGGREGATION_COMPLETE).isNotNull()) .split() .method(new MySplitterPojo()) .to("seda:output"); The MySplitterPojo simply returns List containing two messages that come from data in the input message body. We copy the body headers to the result messages. It is thread safe, it has no state, ie there are no object fields that are modified. The method is like this it is edited for clarity/privacy: public class MySplitterPojo { public List splitMessage( @Headers Map headers, @Body CombinedObject body) { DefaultMessage a = new DefaultMessage(); a.setBody(body.getA()); a.setHeaders(new HashMap(headers)); DefaultMessage b = new DefaultMessage(); b.setBody(body.getB()); b.setHeaders(new HashMap(headers)); ArrayList result = new ArrayList(2); result.add(a); result.add(b); return result; } } When we run this route we very occasionally get the exception below. You can see that it is entirely within camel, it appears to be trying to copy the map stored under the exchange property Exchange.AGGREGATION_STRATEGY which is a camel internal property key. By inspection of the message I can see that Exchange has just come out of the WindVectorAggregator. This seems like it must be a camel bug to me. Any ideas? 15 Sep 2013 23:06:47,140[Camel (camel-1) thread #21 - seda://filteraccept] WARN AggregateProcessor Error processing aggregated exchange. Exchange[Message: { Trondheim, NO=WindVector [u=-5.92894983291626, v=7.060009002685547], ... }]. Caused by: [java.util.ConcurrentModificationException - null] java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextEntry(Unknown Source) at java.util.HashMap$EntryIterator.next(Unknown Source) at java.util.HashMap$EntryIterator.next(Unknown Source) at java.util.HashMap.putAllForCreate(Unknown Source) at java.util.HashMap.(Unknown Source) at org.apache.camel.processor.MulticastProcessor.setAggregationStrategyOnExchange(MulticastProcessor.java:1011) at org.apache.camel.processor.Splitter.process(Splitter.java:95) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86) at org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:495) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62) at java.util.concurrent.AbstractExecutorService.submit(Unknown Source) at org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:487) at org.apache.camel.processor.aggregate.AggregateProcessor.onCompletion(AggregateProcessor.java:471) at org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:325) at org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:229) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122) at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504) at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:151) at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:285) at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251) at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122) at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504) at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213) at org.apache.camel.processor.Splitter.process(Splitter.java:98) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:294) at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203) at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:150) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira