camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r892587 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/...
Date Sun, 20 Dec 2009 12:27:05 GMT
Author: davsclaus
Date: Sun Dec 20 12:27:04 2009
New Revision: 892587

URL: http://svn.apache.org/viewvc?rev=892587&view=rev
Log:
CAMEL-1483: Camel now uses graceful shutdown when shutting down.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java   (with
props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java
  (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sun Dec 20 12:27:04
2009
@@ -26,6 +26,7 @@
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.DataFormat;
+import org.apache.camel.spi.DataFormatResolver;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -39,6 +40,7 @@
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.ServicePool;
+import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.TypeConverterRegistry;
 
 /**
@@ -486,6 +488,20 @@
     DataFormatDefinition resolveDataFormatDefinition(String name);
 
     /**
+     * Gets the current data format resolver
+     *
+     * @return the resolver
+     */
+    DataFormatResolver getDataFormatResolver();
+
+    /**
+     * Sets a custom data format resolver
+     *
+     * @param dataFormatResolver  the resolver
+     */
+    void setDataFormatResolver(DataFormatResolver dataFormatResolver);
+
+    /**
      * Sets the properties that can be referenced in the camel context
      *
      * @param properties properties
@@ -631,4 +647,19 @@
      * @param classLoader the class loader
      */
     void setApplicationContextClassLoader(ClassLoader classLoader);
+
+    /**
+     * Gets the current shutdown strategy
+     *
+     * @return the strategy
+     */
+    public ShutdownStrategy getShutdownStrategy();
+
+    /**
+     * Sets a custom shtudown strategy
+     *
+     * @param shutdownStrategy the custom strategy
+     */
+    void setShutdownStrategy(ShutdownStrategy shutdownStrategy);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Sun Dec 20 12:27:04 2009
@@ -26,6 +26,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.MulticastProcessor;
@@ -40,7 +41,7 @@
  *
  * @version $Revision$
  */
-public class SedaConsumer extends ServiceSupport implements Consumer, Runnable {
+public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware
{
     private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
 
     private SedaEndpoint endpoint;
@@ -78,6 +79,17 @@
         return processor;
     }
 
+    public boolean deferShutdown() {
+        // deny stopping on shutdown as we want seda consumers to run in case some other
queues
+        // depend on this consumer to run, so it can complete its exchanges
+        return false;
+    }
+
+    public int getPendingExchanges() {
+        // number of pending messages on the queue
+        return endpoint.getQueue().size();
+    }
+
     public void run() {
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         while (queue != null && isRunAllowed()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sun
Dec 20 12:27:04 2009
@@ -80,6 +80,7 @@
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ServicePool;
+import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.EventHelper;
@@ -147,6 +148,7 @@
     private InflightRepository inflightRepository = new DefaultInflightRepository();
     private final List<Consumer> routeStartupOrder = new ArrayList<Consumer>();
     private int defaultRouteStartupOrder = 1000;
+    private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
 
     public DefaultCamelContext() {
         super();
@@ -1069,6 +1071,7 @@
         forceLazyInitialization();
         startServices(components.values());
         addService(inflightRepository);
+        addService(shutdownStrategy);
 
         // To avoid initiating the routeDefinitions after stopping the camel context
         if (!routeDefinitionInitiated) {
@@ -1084,23 +1087,24 @@
         EventHelper.notifyCamelContextStopping(this);
 
         // stop route inputs in the same order as they was started so we stop the very first
inputs first
-        stopServices(getRouteStartupOrder(), false);
+        shutdownStrategy.shutdown(this, getRouteStartupOrder());
         getRouteStartupOrder().clear();
 
-        // the stop order is important
-        stopServices(endpoints.values());
-        endpoints.clear();
-
         stopServices(routeServices.values());
         // do not clear route services as we can start Camel again and get the route back
as before
-        stopServices(producerServicePool);
 
-        stopServices(components.values());
-        components.clear();
+        // the stop order is important
 
         stopServices(servicesToClose);
         servicesToClose.clear();
 
+        stopServices(endpoints.values());
+        endpoints.clear();
+
+        stopServices(components.values());
+        components.clear();
+
+        stopServices(producerServicePool);
         stopServices(inflightRepository);
 
         try {
@@ -1411,6 +1415,14 @@
         return dataFormatResolver.resolveDataFormatDefinition(name, this);
     }
 
+    public ShutdownStrategy getShutdownStrategy() {
+        return shutdownStrategy;
+    }
+
+    public void setShutdownStrategy(ShutdownStrategy shutdownStrategy) {
+        this.shutdownStrategy = shutdownStrategy;
+    }
+
     protected String getEndpointKey(String uri, Endpoint endpoint) {
         if (endpoint.isSingleton()) {
             return uri;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
Sun Dec 20 12:27:04 2009
@@ -38,7 +38,10 @@
     private final ConcurrentHashMap<String, AtomicInteger> endpointCount = new ConcurrentHashMap<String,
AtomicInteger>();
 
     public void add(Exchange exchange) {
-        totalCount.incrementAndGet();
+        int count = totalCount.incrementAndGet();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Total " + count + " inflight exchanges. Last added: " + exchange.getExchangeId());
+        }
 
         if (exchange.getFromEndpoint() == null) {
             return;
@@ -52,7 +55,10 @@
     }
 
     public void remove(Exchange exchange) {
-        totalCount.decrementAndGet();
+        int count = totalCount.decrementAndGet();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Total " + count + " inflight exchanges. Last removed: " + exchange.getExchangeId());
+        }
 
         if (exchange.getFromEndpoint() == null) {
             return;

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=892587&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
Sun Dec 20 12:27:04 2009
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.spi.ShutdownStrategy;
+import org.apache.camel.util.EventHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Default shutdowns strategy which supports graceful shutdown.
+ * <p/>
+ * Graceful shutdown ensures that any inflight and pending messages will be taken into account
+ * and it will wait until these exchanges has been completed.
+ * <p/>
+ * As this strategy will politely wait until all exchanges has been completed it can potential
wait
+ * for a long time, and hence why a timeout value can be set. When the timeout triggers you
can also
+ * specify whether the remainder consumers should be shutdown now or ignore.
+ * <p/>
+ * Will by default use a timeout of 5 minutes by which it will shutdown now the remaining
consumers.
+ * This ensures that when shutting down Camel it at some point eventually will shutdown.
+ * This behavior can of course be configured using the {@link #setTimeout(long)} and
+ * {@link #setShutdownNowOnTimeout(boolean)} methods.
+ *
+ * @version $Revision$
+ */
+public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy {
+    private static final transient Log LOG = LogFactory.getLog(DefaultShutdownStrategy.class);
+
+    private ExecutorService executor;
+    private long timeout = 5 * 60;
+    private TimeUnit timeUnit = TimeUnit.SECONDS;
+    private boolean shutdownNowOnTimeout = true;
+
+    public void shutdown(CamelContext context, List<Consumer> consumers) throws Exception
{
+
+        long start = System.currentTimeMillis();
+
+        if (timeout > 0) {
+            LOG.info("Starting to graceful shutdown routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase()
+ ")");
+        } else {
+            LOG.info("Starting to graceful shutdown routes (no timeout)");
+        }
+
+        // use another thread to perform the shutdowns so we can support timeout
+        Future future = getExecutorService().submit(new ShutdownTask(context, consumers));
+        try {
+            if (timeout > 0) {
+                future.get(timeout, timeUnit);
+            } else {
+                future.get();
+            }
+        } catch (TimeoutException e) {
+            // timeout then cancel the task
+            future.cancel(true);
+
+            if (shutdownNowOnTimeout) {
+                LOG.warn("Timeout occurred. Now forcing all routes to be shutdown now.");
+                // force the consumers to shutdown now
+                shutdownNow(consumers);
+            } else {
+                LOG.warn("Timeout occurred. Will ignore shutting down the remainder route
input consumers.");
+            }
+        } catch (ExecutionException e) {
+            // unwrap execution exception
+            throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
+        }
+
+        long delta = System.currentTimeMillis() - start;
+        // convert to seconds as its easier to read than a big milli seconds number 
+        long seconds = TimeUnit.SECONDS.convert(delta, TimeUnit.MILLISECONDS);
+
+        LOG.info("Graceful shutdown of routes completed in " + seconds + " seconds");
+    }
+
+    /**
+     * Set an timeout to wait for the shutdown to complete.
+     * <p/>
+     * Setting a value of 0 or negative will disable timeout and wait until complete
+     * (potential blocking forever)
+     *
+     * @param timeout timeout in millis
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * Set the time unit to use
+     *
+     * @param timeUnit the unit to use
+     */
+    public void setTimeUnit(TimeUnit timeUnit) {
+        this.timeUnit = timeUnit;
+    }
+
+    /**
+     * Sets whether to force shutdown of all consumers when a timeout occurred and thus
+     * not all consumers was shutdown within that period.
+     *
+     * @param shutdownNowOnTimeout <tt>true</tt> to force shutdown, <tt>false</tt>
to leave them running
+     */
+    public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
+        this.shutdownNowOnTimeout = shutdownNowOnTimeout;
+    }
+
+    protected void shutdownNow(List<Consumer> consumers) {
+        for (Consumer consumer : consumers) {
+            shutdownConsumer(consumer);
+        }
+    }
+
+    protected void shutdownConsumer(Consumer consumer) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Shutting down: " + consumer);
+        }
+
+        // allow us to do custom work before delegating to service helper
+        try {
+            ServiceHelper.stopService(consumer);
+        } catch (Exception e) {
+            LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception
will be ignored.");
+            // fire event
+            EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(),
consumer, e);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Shutdown complete for: " + consumer);
+        }
+    }
+
+    private ExecutorService getExecutorService() {
+        if (executor == null) {
+            executor = ExecutorServiceHelper.newSingleThreadExecutor("ShutdownTask", true);
+        }
+        return executor;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+        executor = null;
+    }
+
+    class ShutdownTask implements Runnable {
+
+        private final CamelContext context;
+        private final List<Consumer> consumers;
+
+        public ShutdownTask(CamelContext context, List<Consumer> consumers) {
+            this.context = context;
+            this.consumers = consumers;
+        }
+
+        public void run() {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("There are " + consumers.size() + " routes to shutdown");
+            }
+
+            // list of deferred consumers to shutdown when all exchanges has been completed
routed
+            // and thus there are no more inflight exchanges so they can be safely shutdown
at that time
+            List<Consumer> deferredConsumers = new ArrayList<Consumer>();
+
+            for (Consumer consumer : consumers) {
+
+                // some consumers do not support shutting down so let them decide
+                boolean shutdown = true;
+                if (consumer instanceof ShutdownAware) {
+                    shutdown = ((ShutdownAware) consumer).deferShutdown();
+                }
+
+                if (shutdown) {
+                    shutdownConsumer(consumer);
+                } else {
+                    // we will stop it later, but for now it must run to be able to help
all inflight messages
+                    // be safely completed
+                    deferredConsumers.add(consumer);
+                }
+            }
+
+            // wait till there are no more pending inflight messages
+            boolean done = false;
+            while (!done) {
+                int size = 0;
+                for (Consumer consumer : consumers) {
+                    size += context.getInflightRepository().size(consumer.getEndpoint());
+                    // include any additional pending exchanges on some consumers which may
have internal
+                    // memory queues such as seda
+                    if (consumer instanceof ShutdownAware) {
+                        size += ((ShutdownAware) consumer).getPendingExchanges();
+                    }
+                }
+                if (size > 0) {
+                    try {
+                        LOG.info("Waiting as there are still " + size + " inflight exchanges
to complete before we can shutdown");
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        LOG.warn("Interrupted while waiting during graceful shutdown, will
force shutdown now.");
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                } else {
+                    done = true;
+                }
+            }
+
+            // now all messages has been completed then stop the deferred consumers
+            shutdownNow(deferredConsumers);
+        }
+
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Sun
Dec 20 12:27:04 2009
@@ -214,8 +214,6 @@
         return errorHandler;
     }
 
-
-
     /**
      * Adds the given list of interceptors to the channel.
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Sun Dec 20 12:27:04 2009
@@ -85,7 +85,7 @@
                 try {
                     sleep(delay);
                 } catch (InterruptedException e) {
-                    handleSleepInteruptedException(e);
+                    handleSleepInterruptedException(e);
                 }
             }
         }
@@ -106,10 +106,10 @@
     }
 
     /**
-     * Called when a sleep is interupted; allows derived classes to handle this
+     * Called when a sleep is interrupted; allows derived classes to handle this
      * case differently
      */
-    protected void handleSleepInteruptedException(InterruptedException e) {
+    protected void handleSleepInterruptedException(InterruptedException e) {
         if (log.isDebugEnabled()) {
             log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Sun Dec 20 12:27:04 2009
@@ -16,15 +16,11 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.RejectedExecutionException;
-
-import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -96,10 +92,6 @@
      */
     protected void processErrorHandler(final Exchange exchange, final RedeliveryData data)
throws Exception {
         while (true) {
-            // we can't keep retrying if the route is being shutdown
-            if (!isRunAllowed(exchange)) {
-                return;
-            }
 
             // did previous processing cause an exception?
             boolean handle = shouldHandleException(exchange);
@@ -141,11 +133,6 @@
                     continue;
                 }
 
-                // do a sanity check whether we are still allowed to run before continuing
as we just woke up
-                if (!isRunAllowed(exchange)) {
-                    return;
-                }
-
                 // letting onRedeliver be executed
                 deliverToRedeliveryProcessor(exchange, data);
             }
@@ -167,45 +154,6 @@
     }
 
     /**
-     * Strategy the detect if we are allowed to run
-     * <p/>
-     * This implementation detects if Camel is shutting down as well
-     *
-     * @param exchange  the exchange
-     * @return <tt>true</tt> if we can process the exchange, <tt>false</tt>
to stop processing it
-     */
-    protected boolean isRunAllowed(Exchange exchange) {
-        // check if camel is stopping
-        boolean stoppingCamel = false;
-        CamelContext context = exchange.getContext();
-        if (context instanceof ServiceSupport) {
-            stoppingCamel = !((ServiceSupport) context).isRunAllowed();
-        }
-
-        if (stoppingCamel || !isRunAllowed()) {
-
-            if (log.isDebugEnabled()) {
-                boolean stopping = isStopping() || isStopped();
-                if (stopping) {
-                    log.debug("Rejected execution as we are stopping for exchange: " + exchange);
-                } else {
-                    log.debug("Rejected execution as we are not started for exchange: " +
exchange);
-                }
-            }
-
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException());
-            }
-
-            // and stop continue routing
-            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
      * Strategy whether the exchange has an exception that we should try to handle.
      * <p/>
      * Standard implementations should just look for an exception.

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java?rev=892587&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java Sun Dec 20
12:27:04 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * Allows {@link org.apache.camel.Consumer} to fine grained control on shutdown which mostly
+ * have to cater for in-memory based components. These components need to be able to have
an extra
+ * chance to have their pending exchanges being completed to support graceful shutdown. This
helps
+ * ensure that no messages get lost.
+ *
+ * @version $Revision$
+ * @see org.apache.camel.spi.ShutdownStrategy
+ */
+public interface ShutdownAware {
+
+    /**
+     * To defer shutdown during first phase of shutdown. This allows any pending exchanges
to be completed
+     * and therefore ensure a graceful shutdown without loosing messages. At the very end
when there are no
+     * more inflight and pending messages the consumer could then safely be shutdown.
+     * <p/>
+     * This is needed by {@link org.apache.camel.component.seda.SedaConsumer}.
+     *
+     * @return <tt>true</tt> to defer shutdown to very last.
+     */
+    boolean deferShutdown();
+
+    /**
+     * Some consumers have internal queues with {@link org.apache.camel.Exchange} which are
pending.
+     * <p/>
+     * Return <tt>zero</tt> to indicate no pending exchanges and therefore ready
to shutdown.
+     *
+     * @return number of pending exchanges
+     */
+    int getPendingExchanges();
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=892587&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java Sun Dec
20 12:27:04 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+
+/**
+ * Pluggable shutdown strategy executed during shutdown of routes.
+ * <p/>
+ * Shutting down routes in a reliable and graceful manner is not a trivial task. Therefore
Camel provides a pluggable
+ * strategy allowing 3rd party to use their own strategy if needed.
+ * <p/>
+ * The key problem is to stop the input consumers for the routes such that no new messages
is coming into Camel.
+ * But at the same time still keep the routes running so the existing in flight exchanges
can still be run to
+ * completion. On top of that there are some in memory components (such as SEDA) which may
have pending messages
+ * on its in memory queue which we want to run to completion as well, otherwise they will
get lost.
+ * <p/>
+ * Camel provides a default strategy which supports all that that can be used as inspiration
for your own strategy.
+ *
+ * @version $Revision$
+ * @see org.apache.camel.spi.ShutdownAware
+ */
+public interface ShutdownStrategy {
+
+    /**
+     * Shutdown the routes
+     *
+     * @param context the camel context
+     * @param consumers the consumers for the routes, ordered by the order they was started
+     * @throws Exception is thrown if error shutting down the consumers, however its preferred
to avoid this
+     */
+    void shutdown(CamelContext context, List<Consumer> consumers) throws Exception;
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
Sun Dec 20 12:27:04 2009
@@ -50,21 +50,23 @@
 
         context.stop();
 
-        assertEquals(6, dummy1.getEvents().size());
-        assertEquals(6, dummy2.getEvents().size());
+        assertEquals(7, dummy1.getEvents().size());
+        assertEquals(7, dummy2.getEvents().size());
 
         assertEquals("onContextStart", dummy1.getEvents().get(0));
         assertEquals("onContextStart", dummy2.getEvents().get(0));
         assertEquals("onServiceAdd", dummy1.getEvents().get(1));
         assertEquals("onServiceAdd", dummy2.getEvents().get(1));
-        assertEquals("onComponentAdd", dummy1.getEvents().get(2));
-        assertEquals("onComponentAdd", dummy2.getEvents().get(2));
-        assertEquals("onEndpointAdd", dummy1.getEvents().get(3));
-        assertEquals("onEndpointAdd", dummy2.getEvents().get(3));
-        assertEquals("onComponentRemove", dummy1.getEvents().get(4));
-        assertEquals("onComponentRemove", dummy2.getEvents().get(4));
-        assertEquals("onContextStop", dummy1.getEvents().get(5));
-        assertEquals("onContextStop", dummy2.getEvents().get(5));
+        assertEquals("onServiceAdd", dummy1.getEvents().get(2));
+        assertEquals("onServiceAdd", dummy2.getEvents().get(2));
+        assertEquals("onComponentAdd", dummy1.getEvents().get(3));
+        assertEquals("onComponentAdd", dummy2.getEvents().get(3));
+        assertEquals("onEndpointAdd", dummy1.getEvents().get(4));
+        assertEquals("onEndpointAdd", dummy2.getEvents().get(4));
+        assertEquals("onComponentRemove", dummy1.getEvents().get(5));
+        assertEquals("onComponentRemove", dummy2.getEvents().get(5));
+        assertEquals("onContextStop", dummy1.getEvents().get(6));
+        assertEquals("onContextStop", dummy2.getEvents().get(6));
     }
 
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java?rev=892587&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
Sun Dec 20 12:27:04 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
+
+    private static String foo = "";
+
+    public void testShutdownGraceful() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+
+        template.sendBody("seda:foo", "A");
+        template.sendBody("seda:foo", "B");
+        template.sendBody("seda:foo", "C");
+        template.sendBody("seda:foo", "D");
+        template.sendBody("seda:foo", "E");
+
+        assertMockEndpointsSatisfied();
+
+        // now stop the route before its complete
+        foo = foo + "stop";
+        context.stop();
+
+        // it should wait as there was 1 inflight exchange and 4 pending messages left
+        assertEquals("Should graceful shutdown", "stopABCDE", foo);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").to("mock:foo").delay(1000).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        foo = foo + exchange.getIn().getBody(String.class);
+                    }
+                });
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java?rev=892587&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java
Sun Dec 20 12:27:04 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SimpleShutdownGracefulTest extends ContextTestSupport {
+
+    private static String foo = "";
+
+    public void testShutdownGraceful() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        template.sendBody("seda:foo", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        // now stop the route before its complete
+        foo = foo + "stop";
+        context.stop();
+
+        // it should wait as there was 1 inflight exchange when asked to stop
+        assertEquals("Should graceful shutdown", "stopHello World", foo);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").to("mock:foo").delay(2000).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        foo = foo + exchange.getIn().getBody(String.class);
+                    }
+                });
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/SimpleShutdownGracefulTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Sun Dec 20 12:27:04 2009
@@ -27,6 +27,7 @@
 log4j.logger.org.apache.camel.impl.converter=WARN
 log4j.logger.org.apache.camel.management=WARN
 log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel.impl=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=892587&r1=892586&r2=892587&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
(original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Sun Dec 20 12:27:04 2009
@@ -67,6 +67,7 @@
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.Registry;
+import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
@@ -252,6 +253,12 @@
             getContext().getManagementStrategy().setEventNotifier(eventNotifier);
         }
 
+        ShutdownStrategy shutdownStrategy = getBeanForType(ShutdownStrategy.class);
+        if (shutdownStrategy != null) {
+            LOG.info("Using custom ShutdownStrategy: " + shutdownStrategy);
+            getContext().setShutdownStrategy(shutdownStrategy);
+        }
+
         // add global interceptors
         Map<String, InterceptStrategy> interceptStrategies = getContext().getRegistry().lookupByType(InterceptStrategy.class);
         if (interceptStrategies != null && !interceptStrategies.isEmpty()) {



Mime
View raw message