camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joshua Groom <joshua.gr...@metservice.com>
Subject ConcurrentModificationException thrown from inside camel splitter
Date Thu, 19 Sep 2013 00:56:37 GMT
Hi,
I have a route that looks like this. It reads in files and puts them on a
seda queue with 8 concurrent consumers.
- The SpatialInterpolationPojo reads each file is read and split into two
messages X and Y. 
- The WindVectorAggregator puts X and Y back together and outputs a combined
message A.B
- The WindSplitterPojo splits A.B into two messages A and B

from("file://somefile")
    .to("seda:filteraccept?concurrentConsumers=8");

from("seda:filteraccept?concurrentConsumers=8")
    .split()
    .method(new SpatialInterpolationPojo(), "split")
    .to("direct:wind-aggregator");

from("direct:wind-aggregator")
    .aggregate(packageCorrelationId(), new WindVectorAggregator())
    .completionPredicate(header(FIELD_AGGREGATION_COMPLETE).isNotNull())
    .split()
    .method(new WindSplitterPojo())
    .to("seda:output");

When we run this route we very occasionally get the exception below. You can
see that it is entirely within camel, it appears to be trying to copy the
map stored under the exchange property Exchange.AGGREGATION_STRATEGY which
is a camel internal property key.

By inspection of the message I can see that Exchange has just come out of
the WindVectorAggregator.

This seems like it must be a camel bug to me. Any ideas?

15 Sep 2013 23:06:47,140[Camel (camel-1) thread #21 - seda://filteraccept]
WARN AggregateProcessor Error processing aggregated exchange.
Exchange[Message: { Trondheim, NO=WindVector [u=-5.92894983291626,
v=7.060009002685547], ... }]. Caused by:
[java.util.ConcurrentModificationException - null]
java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextEntry(Unknown Source)
	at java.util.HashMap$EntryIterator.next(Unknown Source)
	at java.util.HashMap$EntryIterator.next(Unknown Source)
	at java.util.HashMap.putAllForCreate(Unknown Source)
	at java.util.HashMap.<init>(Unknown Source)
	at
org.apache.camel.processor.MulticastProcessor.setAggregationStrategyOnExchange(MulticastProcessor.java:1011)
	at org.apache.camel.processor.Splitter.process(Splitter.java:95)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
	at
org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:495)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at
org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)
	at java.util.concurrent.AbstractExecutorService.submit(Unknown Source)
	at
org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:487)
	at
org.apache.camel.processor.aggregate.AggregateProcessor.onCompletion(AggregateProcessor.java:471)
	at
org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:325)
	at
org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:229)
	at
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)
	at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571)
	at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504)
	at
org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:151)
	at
org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:285)
	at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251)
	at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)
	at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
	at
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571)
	at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504)
	at
org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213)
	at org.apache.camel.processor.Splitter.process(Splitter.java:98)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
	at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at
org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:294)
	at
org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203)
	at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:150)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)



--
View this message in context: http://camel.465427.n5.nabble.com/ConcurrentModificationException-thrown-from-inside-camel-splitter-tp5739793.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message