Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 80596 invoked from network); 22 Nov 2010 10:38:17 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 22 Nov 2010 10:38:17 -0000 Received: (qmail 35163 invoked by uid 500); 22 Nov 2010 10:38:48 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 35108 invoked by uid 500); 22 Nov 2010 10:38:47 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 35099 invoked by uid 99); 22 Nov 2010 10:38:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Nov 2010 10:38:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Nov 2010 10:38:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C5161238899C; Mon, 22 Nov 2010 10:37:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1037666 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/SedaConsumer.java test/java/org/apache/camel/processor/ShutdownDeferTest.java Date: Mon, 22 Nov 2010 10:37:14 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101122103714.C5161238899C@eris.apache.org> Author: davsclaus Date: Mon Nov 22 10:37:14 2010 New Revision: 1037666 URL: http://svn.apache.org/viewvc?rev=1037666&view=rev Log: CAMEL-3348: Fixed issue with seda consumer in rare condition may lose exchange during graceful shutdown on high loaded systems. Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1037666&r1=1037665&r2=1037666&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Nov 22 10:37:14 2010 @@ -16,11 +16,10 @@ */ package org.apache.camel.component.seda; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -36,7 +35,6 @@ import org.apache.camel.processor.Multic import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.util.AsyncProcessorHelper; -import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,6 +46,10 @@ import org.apache.commons.logging.LogFac public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); + // use a task counter to help ensure we can graceful shutdown the seda consumers without + // causing any exchanges to be lost due a tiny loophole between the exchange is polled + // and when its registered as in flight exchange + private final AtomicInteger tasks = new AtomicInteger(); private SedaEndpoint endpoint; private AsyncProcessor processor; private ExecutorService executor; @@ -90,24 +92,35 @@ public class SedaConsumer extends Servic public int getPendingExchangesSize() { // number of pending messages on the queue - return endpoint.getQueue().size(); + int answer = endpoint.getQueue().size(); + if (answer == 0) { + // if there are no pending exchanges we at first must ensure that + // all tasks has been completed and the thread is stopped, to avoid + // any condition which otherwise would cause an exchange to be lost + + // we think there are 0 pending exchanges but we are only 100% sure + // when all the running tasks has been shutdown, so they do not + // somehow have polled an Exchange which we otherwise may loose + // due the Exchange takes a little while before its enlisted in the + // in flight registry (to let Camel know there is an Exchange in progress) + answer = tasks.get(); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Pending exchanges " + answer); + } + return answer; } public void run() { BlockingQueue queue = endpoint.getQueue(); while (queue != null && isRunAllowed()) { - final Exchange exchange; + Exchange exchange = null; try { exchange = queue.poll(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); - } - continue; - } - if (exchange != null) { - if (isRunAllowed()) { + if (exchange != null) { try { + tasks.incrementAndGet(); sendToConsumers(exchange); // log exception if an exception occurred and was not handled @@ -116,22 +129,27 @@ public class SedaConsumer extends Servic } } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); + } finally { + tasks.decrementAndGet(); } + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); + } + continue; + } catch (Throwable e) { + if (exchange != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); } else { - if (LOG.isWarnEnabled()) { - LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange); - } - try { - queue.put(exchange); - } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); - } - continue; - } + getExceptionHandler().handleException(e); } } } + + if (LOG.isDebugEnabled()) { + LOG.debug("Ending this polling consumer thread, there are still " + tasks.get() + " threads left."); + } } /** @@ -175,6 +193,8 @@ public class SedaConsumer extends Servic } protected void doStart() throws Exception { + tasks.set(0); + int poolSize = endpoint.getConcurrentConsumers(); executor = endpoint.getCamelContext().getExecutorServiceStrategy() .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); @@ -187,7 +207,9 @@ public class SedaConsumer extends Servic protected void doStop() throws Exception { endpoint.onStopped(this); // must shutdown executor on stop to avoid overhead of having them running - endpoint.getCamelContext().getExecutorServiceStrategy().shutdown(executor); + // use shutdown now to force the tasks which are polling for new exchanges + // to stop immediately to avoid them picking up new exchanges arriving in the mean time + endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); executor = null; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java?rev=1037666&r1=1037665&r2=1037666&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java Mon Nov 22 10:37:14 2010 @@ -50,8 +50,8 @@ public class ShutdownDeferTest extends C context.stop(); - // should route about 4 - 5 (in some rare cases it will only route 4) - assertTrue("Should complete all messages, was " + bar.getReceivedCounter(), bar.getReceivedCounter() >= 4); + // should route all 5 + assertEquals(5, bar.getReceivedCounter()); } @Override