Return-Path: Delivered-To: apmail-camel-dev-archive@www.apache.org Received: (qmail 19680 invoked from network); 19 Nov 2010 22:03:18 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 19 Nov 2010 22:03:18 -0000 Received: (qmail 3744 invoked by uid 500); 19 Nov 2010 22:03:47 -0000 Delivered-To: apmail-camel-dev-archive@camel.apache.org Received: (qmail 3700 invoked by uid 500); 19 Nov 2010 22:03:47 -0000 Mailing-List: contact dev-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list dev@camel.apache.org Received: (qmail 3662 invoked by uid 500); 19 Nov 2010 22:03:47 -0000 Delivered-To: apmail-activemq-camel-dev@activemq.apache.org Received: (qmail 3649 invoked by uid 99); 19 Nov 2010 22:03:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Nov 2010 22:03:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.22] (HELO thor.apache.org) (140.211.11.22) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Nov 2010 22:03:47 +0000 Received: from thor (localhost [127.0.0.1]) by thor.apache.org (8.13.8+Sun/8.13.8) with ESMTP id oAJM3QLA025794 for ; Fri, 19 Nov 2010 22:03:26 GMT Message-ID: <14306601.8731290204206665.JavaMail.jira@thor> Date: Fri, 19 Nov 2010 17:03:26 -0500 (EST) From: "Damien Delautre (JIRA)" To: camel-dev@activemq.apache.org Subject: [jira] Created: (CAMEL-3348) DefaultShutdownStrategy and ShutdownAware losing exchange MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: ae95407df07c98740808b2ef9da0087c DefaultShutdownStrategy and ShutdownAware losing exchange --------------------------------------------------------- Key: CAMEL-3348 URL: https://issues.apache.org/activemq/browse/CAMEL-3348 Project: Apache Camel Issue Type: Bug Components: camel-core Affects Versions: 2.5.0 Reporter: Damien Delautre There's a problem when we shutdown the camel context with a seda endpoint. In the SedaConsumer, the exchange is removed from the queue and then, later, is added to the InflightRepository as shown in the following code (I put comments where it is done): {code} public void run() { BlockingQueue queue = endpoint.getQueue(); while (queue != null && isRunAllowed()) { final Exchange exchange; try { exchange = queue.poll(1000, TimeUnit.MILLISECONDS); // The exchange is removed here from the queue } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); } continue; } if (exchange != null) { if (isRunAllowed()) { try { sendToConsumers(exchange); // Call to sendToConsumers detailed below if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } } else { if (LOG.isWarnEnabled()) { LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange); } try { queue.put(exchange); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); } continue; } } } } } protected void sendToConsumers(Exchange exchange) throws Exception { int size = endpoint.getConsumers().size(); if (size > 1) { if (LOG.isDebugEnabled()) { LOG.debug("Multicasting to " + endpoint.getConsumers().size() + " consumers for Exchange: " + exchange); } MulticastProcessor mp = endpoint.getConumserMulticastProcessor(); AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() { public void done(boolean doneSync) { } }); } else { AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { // This line will create the UnitOfWork (in UnitOfWorkProcessor) which will put the exchange in the InflightRepository public void done(boolean doneSync) { } }); } } {code} If the shutdown occurs between these two actions, the DefaultShutdownStrategy will shutdown the route even if there is a message in progress. And the message will be lost. Here is the code of ShutdownTask in DefaultShutdownStrategy which cause the shutdown even if there is some messages still in progress. (I put comments in it to show the state of the seda queue and InflightRepository if it is called between the queue.poll() and the InflightRepository.add()) {code} for (Consumer consumer : order.getInputs()) { int inflight = context.getInflightRepository().size(consumer.getEndpoint()); // check the number of inflight exchanges which is 0 because the UnitOfWork is not created if (consumer instanceof ShutdownAware) { inflight += ((ShutdownAware) consumer).getPendingExchangesSize(); // check the number of exchange in the seda queue which is 0 because the message is already removed } if (inflight > 0) { size += inflight; if (LOG.isDebugEnabled()) { LOG.debug(inflight + " inflight and pending exchanges for consumer: " + consumer); } } } {code} You can reproduce it by putting a breakpoint in the method {code}protected void sendToConsumers(Exchange exchange){code} in SedaConsumer and calling stop() on the CamelContext while the thread is suspended by the breakpoint. We caught the problem in a unit test where we were testing the shutdown and when our test server was under heavy load. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.