camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blove319 <>
Subject SJMS RejectedExecutionException bug during shutdown
Date Wed, 26 Apr 2017 16:18:57 GMT
Camel versions tested: 2.16 - 2.18.3
Current Maven dependencies:
org.apache.camel:camel-test 2.18.1
org.apache.camel:camel-sjms 2.17.3

When using SJMS (et al?) with an aggregator and/or splitter in the route,
shutting down either throws an error or tosses out messages.


When using an SJMS consumer to consume from a queue, with a route that has
an aggregator in it, I inevitably lose messages when the route stops.

The two obvious documented aggregator modifiers do not work:

*forceCompletionOnStop* - results in a RejectedExecutionException error
because the underlying thread pools are stopped/closed before the
"prepareShutdown" method is called on the aggregator (which is when the
outstanding aggregations are forced to complete and the results are handed
to the route for processing).

*completeAllOnStop* - results in the route logging the number of outstanding
messages every second (the number never changes) until the (500 second?)
timeout is reached, at which point the route is forced to shut down and the
messages are tossed out. Presumably because there is no active thread pool
available to handle the messages.

Without either of these two modifiers on the aggregator, it just tosses out
any unfinished aggregations on shutdown.

Here's a sample test... It probably isn't ideally written, but it does
illustrate the issue...

import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.toolbox.AggregationStrategies;
import org.junit.Rule;
import org.junit.Test;

 * Created by on 4/25/17.
public class SjmsBatchTest {
    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
    CamelContext context = new DefaultCamelContext();
    ProducerTemplate template = context.createProducerTemplate();

    public void testBatch() throws Exception {
        SjmsComponent comp = new SjmsComponent();
        context.addComponent("sjms", comp);
        //context.setShutdownStrategy(new MyShutdownStrategy(context));

        RouteBuilder rb = new RouteBuilder() {
            public void configure() throws Exception {
                    .completionInterval(10000)   // wait $b
                    .completionSize(50)          // wait for $batchSize
messages to aggregate
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception

        template.sendBodyAndHeader("some body", "CamelFileName",

View this message in context:
Sent from the Camel Development mailing list archive at

View raw message