camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhemzhitsky Sergey <>
Subject Async processing and number of inflight exchanges issue
Date Fri, 20 Jan 2012 07:43:37 GMT
Hi gurus,

I have the  following issue. I need to send requests to webservice from camel on timer events.
As timer consumer uses synchronous event firing , only one request can be sent at the same
time. What I’d like is use to threads DSL to process multiple timer events at the same time.

Here is an example:

    .threads(1, 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)

However, usage of threads DSL with original timer consumer does not make sense because of
synchronicity of timer consumer, so I tried to develop asynchronous timer component that uses
asynchronous API to fire events.

Everything works fine except that there is a lot of inflight exchanges when stopping the route.

Could you please suggest how to avoid such an issue?


Below is the unit test that allows to reproduce this unexpected behavior.


import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.junit.Test;

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class CamelRoutingTest extends CamelTestSupport {

    public void route() throws Exception {
        Endpoint endpoint = context().getRoute("timer").getConsumer().getEndpoint();

        // make timer fire for several times

        context().stopRoute("timer", 10, TimeUnit.SECONDS);

        int inflightExchanges = context().getInflightRepository().size(endpoint);
        assertTrue("inflightExchanges: " + inflightExchanges, inflightExchanges == 0);

    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = super.createRegistry();
        registry.bind("atimer", new AsyncTimerComponent());
        return registry;

    protected int getShutdownTimeout() {
        return 1;

    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            public void configure() throws Exception {
                        .threads(1, 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
                                 "START: ${header." + Exchange.BREADCRUMB_ID + "}")
                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
                                 "COMPLETE: ${header." + Exchange.BREADCRUMB_ID + "}")

    public static class AsyncTimerComponent extends DefaultComponent {
        private final Map<String, Timer> timers = new HashMap<String, Timer>();
        public Timer getTimer(AsyncTimerEndpoint endpoint) {
            String key = endpoint.getTimerName();
            if (!endpoint.isDaemon()) {
                key = "nonDaemon:" + key;

            Timer answer;
            synchronized (timers) {
                answer = timers.get(key);
                if (answer == null) {
                    answer = new Timer(endpoint.getTimerName(), endpoint.isDaemon());
                    timers.put(key, answer);
            return answer;

        protected AsyncTimerEndpoint createEndpoint(String uri, String remaining, Map<String,
Object> parameters) throws Exception {
            AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this, remaining);
            setProperties(answer, parameters);
            return answer;

        protected void doStop() throws Exception {
            Collection<Timer> collection = timers.values();
            for (Timer timer : collection) {

    public static class AsyncTimerEndpoint extends DefaultEndpoint implements Service {
        private String timerName;
        private long period = 1000;
        private long delay;
        private boolean fixedRate;
        private boolean daemon = true;
        private Timer timer;
        private long repeatCount;

        public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent component, String timerName)
            super(fullURI, component);
            this.timerName = timerName;
        public Producer createProducer() throws Exception {
            throw new RuntimeCamelException("There is no producer");

        public AsyncTimerConsumer createConsumer(Processor processor) throws Exception {
            return new AsyncTimerConsumer(this, processor);

        public void start() throws Exception {
            // noop

        public void stop() throws Exception {

        public String getTimerName() {
            if (timerName == null) {
                timerName = getEndpointUri();
            return timerName;

        public void setTimerName(String timerName) {
            this.timerName = timerName;

        public boolean isDaemon() {
            return daemon;

        public void setDaemon(boolean daemon) {
            this.daemon = daemon;

        public long getDelay() {
            return delay;

        public void setDelay(long delay) {
            this.delay = delay;

        public boolean isFixedRate() {
            return fixedRate;

        public void setFixedRate(boolean fixedRate) {
            this.fixedRate = fixedRate;

        public long getPeriod() {
            return period;

        public void setPeriod(long period) {
            this.period = period;

        public long getRepeatCount() {
            return repeatCount;

        public void setRepeatCount(long repeatCount) {
            this.repeatCount = repeatCount;

        public boolean isSingleton() {
            return true;

        public synchronized Timer getTimer() {
            if (timer == null) {
                AsyncTimerComponent tc = (AsyncTimerComponent) getComponent();
                timer = tc.getTimer(this);
            return timer;

        public synchronized void setTimer(Timer timer) {
            this.timer = timer;

        public String getEndpointUri() {
            return super.getEndpointUri();

    public static class AsyncTimerConsumer extends DefaultConsumer {

        private final AsyncTimerEndpoint endpoint;
        private volatile TimerTask task;

        public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor processor) {
            super(endpoint, processor);
            this.endpoint = endpoint;

        protected void doStart() throws Exception {
            task = new TimerTask() {
                // counter
                private final AtomicLong counter = new AtomicLong();

                public void run() {
                    try {
                        long count = counter.incrementAndGet();

                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
                        if (fire) {
                        } else {
                            // no need to fire anymore as we exceeded repeat count
                            log.debug("Cancelling {} timer as repeat count limit reached after
{} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
                    } catch (Throwable e) {
                        // catch all to avoid the JVM closing the thread and not firing again
                        log.warn("Error processing exchange. This exception will be ignored,
to let the timer be able to trigger again.", e);

            Timer timer = endpoint.getTimer();
            configureTask(task, timer);

        protected void doStop() throws Exception {
            if (task != null) {
            task = null;

        protected void configureTask(TimerTask task, Timer timer) {
            if (endpoint.isFixedRate()) {
                timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
            } else {
                if (endpoint.getPeriod() > 0) {
                    timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
                } else {
                    timer.schedule(task, endpoint.getDelay());

        protected void sendTimerExchange(long counter) {
            Exchange exchange = endpoint.createExchange();
            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
            exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date());

            log.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
            try {
                AsyncProcessorHelper.process(getAsyncProcessor(), exchange, new AsyncCallback()
                    public void done(boolean doneSync) {
                        // noop

                // log exception if an exception occurred and was not handled
                if (exchange.getException() != null) {
                    getExceptionHandler().handleException("Error processing exchange", exchange,
            } catch (Exception e) {
                getExceptionHandler().handleException("Error processing exchange", exchange,


Best Regards,
Sergey Zhemzhitsky


The information contained in this message may be privileged and conf idential and protected
from disclosure. If you are not the original intended recipient, you are hereby notified that
any review, retransmission, dissemination, or other use of, or taking of any action in reliance
upon, this information is prohibited. If you have received this communication in error, please
notify the sender immediately by replying to this message and delete it from your computer.
Thank you for your cooperation. Troika Dialog, Russia. 
If you need assistance please contact our Contact Center  (+7495) 258 0500 or go to

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message