camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Joshua Groom (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CAMEL-6771) ConcurrentModificationException thrown from inside camel splitter
Date Fri, 20 Sep 2013 06:17:51 GMT
Joshua Groom created CAMEL-6771:
-----------------------------------

             Summary: ConcurrentModificationException thrown from inside camel splitter
                 Key: CAMEL-6771
                 URL: https://issues.apache.org/jira/browse/CAMEL-6771
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.11.1
         Environment: Amazon Linux, oracle jdk 1.7
            Reporter: Joshua Groom



We use camel 2.11.1 running on the oracle 1.7 jvm for linux.

I have a route that looks like this. It reads in files and puts them on a seda queue with
8 concurrent consumers. 
- The SpatialInterpolationPojo reads each file is read and split into two messages X and Y.

- The MyAggregator uses X and Y together and outputs a combined message A.B 
- The MySplitterPojo splits A.B into two messages A and B 

from("file://somefile") 
    .to("seda:filteraccept?concurrentConsumers=8"); 

from("seda:filteraccept?concurrentConsumers=8") 
    .split() 
    .method(new SpatialInterpolationPojo(), "split") 
    .to("direct:wind-aggregator"); 

from("direct:wind-aggregator") 
    .aggregate(packageCorrelationId(), new MyAggregator()) 
    .completionPredicate(header(FIELD_AGGREGATION_COMPLETE).isNotNull()) 
    .split() 
    .method(new MySplitterPojo()) 
    .to("seda:output"); 

The MySplitterPojo simply returns List<Message> containing two messages that come from
data in the input message body. We copy the body headers to the result messages. 

It is thread safe, it has no state, ie there are no object fields that are modified. 

The method is like this it is edited for clarity/privacy: 
public class MySplitterPojo {

 public List<Message> splitMessage( 
            @Headers Map<String, Object> headers, 
            @Body CombinedObject body) { 
    
    DefaultMessage a = new DefaultMessage(); 
    a.setBody(body.getA()); 
    a.setHeaders(new HashMap<String, Object>(headers)); 
            
    DefaultMessage b = new DefaultMessage(); 
    b.setBody(body.getB()); 
    b.setHeaders(new HashMap<String, Object>(headers)); 
  
    ArrayList<Message> result = new ArrayList<Message>(2); 
    result.add(a); 
    result.add(b); 
    
    return result; 
 } 
}

When we run this route we very occasionally get the exception below. You can see that it is
entirely within camel, it appears to be trying to copy the map stored under the exchange property
Exchange.AGGREGATION_STRATEGY which is a camel internal property key. 

By inspection of the message I can see that Exchange has just come out of the WindVectorAggregator.


This seems like it must be a camel bug to me. Any ideas? 

15 Sep 2013 23:06:47,140[Camel (camel-1) thread #21 - seda://filteraccept] WARN AggregateProcessor
Error processing aggregated exchange. Exchange[Message: { Trondheim, NO=WindVector [u=-5.92894983291626,
v=7.060009002685547], ... }]. Caused by: [java.util.ConcurrentModificationException - null]

java.util.ConcurrentModificationException 
        at java.util.HashMap$HashIterator.nextEntry(Unknown Source) 
        at java.util.HashMap$EntryIterator.next(Unknown Source) 
        at java.util.HashMap$EntryIterator.next(Unknown Source) 
        at java.util.HashMap.putAllForCreate(Unknown Source) 
        at java.util.HashMap.<init>(Unknown Source) 
        at org.apache.camel.processor.MulticastProcessor.setAggregationStrategyOnExchange(MulticastProcessor.java:1011)

        at org.apache.camel.processor.Splitter.process(Splitter.java:95) 
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)

        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)

        at org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:495)

        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
        at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) 
        at java.util.concurrent.FutureTask.run(Unknown Source) 
        at org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)

        at java.util.concurrent.AbstractExecutorService.submit(Unknown Source) 
        at org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:487)

        at org.apache.camel.processor.aggregate.AggregateProcessor.onCompletion(AggregateProcessor.java:471)

        at org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:325)

        at org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:229)

        at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)

        at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)

        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571)

        at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504)

        at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:151)

        at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:285) 
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251) 
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161) 
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) 
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) 
        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)

        at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)

        at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) 
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) 
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)

        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571)

        at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504)

        at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213)

        at org.apache.camel.processor.Splitter.process(Splitter.java:98) 
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)

        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)

        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) 
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) 
        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)

        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)

        at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)

        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)

        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)

        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)

        at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:294)

        at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203) 
        at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:150) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
        at java.lang.Thread.run(Unknown Source)


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message