camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject git commit: CAMEL-6688: Added option SuppressLoggingOnTimeout to allow shutdown to not log after tineout occurred and doing aggressive shutdown which otherwise may log WARNs about messages not being complete and whatnot.
Date Mon, 02 Sep 2013 08:49:54 GMT
Updated Branches:
  refs/heads/master 06952f7ab -> 0bdf8431d


CAMEL-6688: Added option SuppressLoggingOnTimeout to allow shutdown to not log after tineout
occurred and doing aggressive shutdown which otherwise may log WARNs about messages not being
complete and whatnot.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0bdf8431
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0bdf8431
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0bdf8431

Branch: refs/heads/master
Commit: 0bdf8431df29986fa89ff50bc5c594c99d4e5711
Parents: 06952f7
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Sep 2 10:42:52 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Sep 2 10:49:04 2013 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileOnCompletion.java | 14 +++---
 .../camel/component/seda/SedaConsumer.java      |  4 +-
 .../BridgeExceptionHandlerToErrorHandler.java   |  2 +-
 .../org/apache/camel/impl/DefaultConsumer.java  |  4 +-
 .../camel/impl/DefaultShutdownStrategy.java     | 41 ++++++++++++---
 .../camel/impl/EventDrivenPollingConsumer.java  |  3 +-
 .../camel/impl/LoggingExceptionHandler.java     | 52 ++++++++++++++++----
 .../camel/impl/PollingConsumerSupport.java      |  4 +-
 .../apache/camel/impl/RoutePolicySupport.java   | 11 +++--
 .../apache/camel/processor/BatchProcessor.java  |  4 +-
 .../camel/processor/StreamResequencer.java      |  2 +-
 .../processor/aggregate/AggregateProcessor.java |  3 +-
 .../org/apache/camel/spi/ShutdownStrategy.java  | 31 ++++++++++++
 ...ownStrategySuppressLoggingOnTimeoutTest.java | 46 +++++++++++++++++
 14 files changed, 175 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
index 9511159..5bedc00 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
@@ -47,6 +47,7 @@ public class GenericFileOnCompletion<T> implements Synchronization
{
         this.operations = operations;
         this.file = file;
         this.absoluteFileName = absoluteFileName;
+        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
     }
 
     public void onComplete(Exchange exchange) {
@@ -58,9 +59,6 @@ public class GenericFileOnCompletion<T> implements Synchronization
{
     }
 
     public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
         return exceptionHandler;
     }
 
@@ -136,7 +134,7 @@ public class GenericFileOnCompletion<T> implements Synchronization
{
                         log.warn("Done file: " + doneFileName + " could not be deleted");
                     }
                 } catch (Exception e) {
-                    handleException(e);
+                    handleException("Error deleting done file: " + doneFileName, exchange,
e);
                 }
             }
         }
@@ -145,7 +143,7 @@ public class GenericFileOnCompletion<T> implements Synchronization
{
             log.trace("Commit file strategy: {} for file: {}", processStrategy, file);
             processStrategy.commit(operations, endpoint, exchange, file);
         } catch (Exception e) {
-            handleException(e);
+            handleException("Error during commit", exchange, e);
         }
     }
 
@@ -165,13 +163,13 @@ public class GenericFileOnCompletion<T> implements Synchronization
{
         try {
             processStrategy.rollback(operations, endpoint, exchange, file);
         } catch (Exception e) {
-            handleException(e);
+            handleException("Error during rollback", exchange, e);
         }
     }
 
-    protected void handleException(Throwable t) {
+    protected void handleException(String message, Exchange exchange, Throwable t) {
         Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception")
: t;
-        getExceptionHandler().handleException(newt);
+        getExceptionHandler().handleException(message, exchange, newt);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index ce6d69d..d0f47bd 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -69,6 +69,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
         this.endpoint = endpoint;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.pollTimeout = endpoint.getPollTimeout();
+        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
     }
 
     @Override
@@ -81,9 +82,6 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
     }
 
     public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
         return exceptionHandler;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java
b/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java
index 12ba590..6292189 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java
@@ -44,7 +44,7 @@ public class BridgeExceptionHandlerToErrorHandler implements ExceptionHandler
{
 
     public BridgeExceptionHandlerToErrorHandler(DefaultConsumer consumer) {
         this.consumer = consumer;
-        this.fallback = new LoggingExceptionHandler(consumer.getClass());
+        this.fallback = new LoggingExceptionHandler(consumer.getEndpoint().getCamelContext(),
consumer.getClass());
         this.bridge = consumer.getProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
index fd400ec..74e9555 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
@@ -46,6 +46,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer
{
     public DefaultConsumer(Endpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
         this.processor = processor;
+        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
     }
 
     @Override
@@ -105,9 +106,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer
{
     }
 
     public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
         return exceptionHandler;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
index d94b34e..fddd80a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
@@ -93,6 +93,11 @@ import org.slf4j.LoggerFactory;
  * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
  * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
  * on the services. This allows the services to know they should force shutdown now.
+ * <p/>
+ * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks
which are
+ * still inflight which may be rejected continued being routed. By default this can cause
WARN and ERRORs
+ * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to
suppress these
+ * logs, so they are logged at TRACE level instead.
  *
  * @version 
  */
@@ -105,7 +110,9 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
     private TimeUnit timeUnit = TimeUnit.SECONDS;
     private boolean shutdownNowOnTimeout = true;
     private boolean shutdownRoutesInReverseOrder = true;
+    private boolean suppressLoggingOnTimeout;
     private volatile boolean forceShutdown;
+    private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
 
     public DefaultShutdownStrategy() {
     }
@@ -165,7 +172,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
         LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout
" + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
 
         // use another thread to perform the shutdowns so we can support timeout
-        final AtomicBoolean timeoutOccurred = new AtomicBoolean();
+        timeoutOccurred.set(false);
         Future<?> future = getExecutorService().submit(new ShutdownTask(context, routesOrdered,
timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred));
         try {
             future.get(timeout, timeUnit);
@@ -192,7 +199,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
                     // now the route consumers has been shutdown, then prepare route services
for shutdown now (forced)
                     for (RouteStartupOrder order : routes) {
                         for (Service service : order.getServices()) {
-                            prepareShutdown(service, true, true);
+                            prepareShutdown(service, true, true, isSuppressLoggingOnTimeout());
                         }
                     }
                 } else {
@@ -216,6 +223,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
         return forceShutdown;
     }
 
+    @Override
+    public boolean hasTimeoutOccurred() {
+        return timeoutOccurred.get();
+    }
+
     public void setTimeout(long timeout) {
         if (timeout <= 0) {
             throw new IllegalArgumentException("Timeout must be a positive value");
@@ -251,6 +263,14 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
         this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder;
     }
 
+    public boolean isSuppressLoggingOnTimeout() {
+        return suppressLoggingOnTimeout;
+    }
+
+    public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) {
+        this.suppressLoggingOnTimeout = suppressLoggingOnTimeout;
+    }
+
     public CamelContext getCamelContext() {
         return camelContext;
     }
@@ -345,6 +365,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
         ObjectHelper.notNull(camelContext, "CamelContext");
         // reset option
         forceShutdown = false;
+        timeoutOccurred.set(false);
     }
 
     @Override
@@ -370,7 +391,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
      * @param forced  whether to force shutdown
      * @param includeChildren whether to prepare the child of the service as well
      */
-    private static void prepareShutdown(Service service, boolean forced, boolean includeChildren)
{
+    private static void prepareShutdown(Service service, boolean forced, boolean includeChildren,
boolean suppressLogging) {
         Set<Service> list;
         if (includeChildren) {
             // include error handlers as we want to prepare them for shutdown as well
@@ -386,7 +407,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
                     LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
                     ((ShutdownPrepared) child).prepareShutdown(forced);
                 } catch (Exception e) {
-                    LOG.warn("Error during prepare shutdown on " + child + ". This exception
will be ignored.", e);
+                    if (suppressLogging) {
+                        LOG.trace("Error during prepare shutdown on " + child + ". This exception
will be ignored.", e);
+                    } else {
+                        LOG.warn("Error during prepare shutdown on " + child + ". This exception
will be ignored.", e);
+                    }
                 }
             }
         }
@@ -509,7 +534,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
                     if (service instanceof Consumer) {
                         continue;
                     }
-                    prepareShutdown(service, false, true);
+                    prepareShutdown(service, false, true, false);
                 }
             }
 
@@ -558,7 +583,8 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
                 if (consumer instanceof ShutdownAware) {
                     LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
                     boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
-                    prepareShutdown(consumer, forced, false);
+                    boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
+                    prepareShutdown(consumer, forced, false, suppress);
                     LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
                 }
             }
@@ -579,7 +605,8 @@ public class DefaultShutdownStrategy extends ServiceSupport implements
ShutdownS
             for (RouteStartupOrder order : routes) {
                 for (Service service : order.getServices()) {
                     boolean forced = context.getShutdownStrategy().forceShutdown(service);
-                    prepareShutdown(service, forced, true);
+                    boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
+                    prepareShutdown(service, forced, true, suppress);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 27d9cc4..07bcf07 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor
{
     private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
     private final BlockingQueue<Exchange> queue;
-    private ExceptionHandler interruptedExceptionHandler = new LoggingExceptionHandler(EventDrivenPollingConsumer.class);
+    private ExceptionHandler interruptedExceptionHandler;
     private Consumer consumer;
 
     public EventDrivenPollingConsumer(Endpoint endpoint) {
@@ -51,6 +51,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
     public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue)
{
         super(endpoint);
         this.queue = queue;
+        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(),
EventDrivenPollingConsumer.class);
     }
 
     public Exchange receiveNoWait() {

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java b/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
index 6518ad4..ba3fc28 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -29,21 +30,41 @@ import org.slf4j.LoggerFactory;
  * log the exception.
  * <p/>
  * This implementation will by default log the exception with stack trace at WARN level.
+ * <p/>
+ * This implementation honors the {@link org.apache.camel.impl.DefaultShutdownStrategy#isSuppressLoggingOnTimeout()}
+ * option to avoid logging if the logging should be suppressed.
  *
  * @version 
  */
 public class LoggingExceptionHandler implements ExceptionHandler {
     private final CamelLogger logger;
+    private final CamelContext camelContext;
 
+    @Deprecated
     public LoggingExceptionHandler(Class<?> ownerType) {
-        this(new CamelLogger(LoggerFactory.getLogger(ownerType), LoggingLevel.WARN));
+        this(null, new CamelLogger(LoggerFactory.getLogger(ownerType), LoggingLevel.WARN));
+    }
+
+    public LoggingExceptionHandler(CamelContext camelContext, Class<?> ownerType) {
+        this(camelContext, new CamelLogger(LoggerFactory.getLogger(ownerType), LoggingLevel.WARN));
     }
 
+    @Deprecated
     public LoggingExceptionHandler(Class<?> ownerType, LoggingLevel level) {
-        this(new CamelLogger(LoggerFactory.getLogger(ownerType), level));
+        this(null, new CamelLogger(LoggerFactory.getLogger(ownerType), level));
     }
 
+    public LoggingExceptionHandler(CamelContext camelContext, Class<?> ownerType, LoggingLevel
level) {
+        this(camelContext, new CamelLogger(LoggerFactory.getLogger(ownerType), level));
+    }
+
+    @Deprecated
     public LoggingExceptionHandler(CamelLogger logger) {
+        this(null, logger);
+    }
+
+    public LoggingExceptionHandler(CamelContext camelContext, CamelLogger logger) {
+        this.camelContext = camelContext;
         this.logger = logger;
     }
 
@@ -57,15 +78,17 @@ public class LoggingExceptionHandler implements ExceptionHandler {
 
     public void handleException(String message, Exchange exchange, Throwable exception) {
         try {
-            String msg = CamelExchangeException.createExceptionMessage(message, exchange,
exception);
-            if (isCausedByRollbackExchangeException(exception)) {
-                // do not log stack trace for intended rollbacks
-                logger.log(msg);
-            } else {
-                if (exception != null) {
-                    logger.log(msg, exception);
-                } else {
+            if (!isSuppressLogging()) {
+                String msg = CamelExchangeException.createExceptionMessage(message, exchange,
exception);
+                if (isCausedByRollbackExchangeException(exception)) {
+                    // do not log stack trace for intended rollbacks
                     logger.log(msg);
+                } else {
+                    if (exception != null) {
+                        logger.log(msg, exception);
+                    } else {
+                        logger.log(msg);
+                    }
                 }
             }
         } catch (Throwable e) {
@@ -86,4 +109,13 @@ public class LoggingExceptionHandler implements ExceptionHandler {
 
         return false;
     }
+
+    protected boolean isSuppressLogging() {
+        if (camelContext != null) {
+            return (camelContext.getStatus().isStopping() || camelContext.getStatus().isStopped())
+                    && camelContext.getShutdownStrategy().hasTimeoutOccurred() &&
camelContext.getShutdownStrategy().isSuppressLoggingOnTimeout();
+        } else {
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java b/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
index 626ed0a..95e0a75 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
@@ -35,6 +35,7 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements
P
 
     public PollingConsumerSupport(Endpoint endpoint) {
         this.endpoint = endpoint;
+        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
     }
 
     @Override
@@ -47,9 +48,6 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements
P
     }
 
     public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
         return exceptionHandler;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java b/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
index f39cb76..c76b4e2 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
@@ -41,7 +41,9 @@ public abstract class RoutePolicySupport extends ServiceSupport implements
Route
     private ExceptionHandler exceptionHandler;
 
     public void onInit(Route route) {
-        // noop
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(route.getRouteContext().getCamelContext(),
getClass());
+        }
     }
 
     public void onRemove(Route route) {
@@ -122,7 +124,9 @@ public abstract class RoutePolicySupport extends ServiceSupport implements
Route
      * @param t the exception to handle
      */
     protected void handleException(Throwable t) {
-        getExceptionHandler().handleException(t);
+        if (exceptionHandler != null) {
+            exceptionHandler.handleException(t);
+        }
     }
 
     @Override
@@ -136,9 +140,6 @@ public abstract class RoutePolicySupport extends ServiceSupport implements
Route
     }
 
     public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
         return exceptionHandler;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
index 3caea14..2bb93f7 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
@@ -88,6 +88,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor,
Na
         this.collection = collection;
         this.expression = expression;
         this.sender = new BatchSender();
+        this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
     }
 
     @Override
@@ -98,9 +99,6 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor,
Na
     // Properties
     // -------------------------------------------------------------------------
     public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
         return exceptionHandler;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
index dea7fcf..c8426d7 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -87,10 +87,10 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
     public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange>
comparator) {
         ObjectHelper.notNull(camelContext, "CamelContext");
         this.camelContext = camelContext;
-        this.exceptionHandler = new LoggingExceptionHandler(getClass());
         this.engine = new ResequencerEngine<Exchange>(comparator);
         this.engine.setSequenceSender(this);
         this.processor = processor;
+        this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 0f716fb..8e2a26f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -98,7 +98,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private ScheduledExecutorService recoverService;
     // store correlation key -> exchange id in timeout map
     private TimeoutMap<String, String> timeoutMap;
-    private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
+    private ExceptionHandler exceptionHandler;
     private AggregationRepository aggregationRepository;
     private Map<String, String> closedCorrelationKeys;
     private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>();
@@ -145,6 +145,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         this.aggregationStrategy = aggregationStrategy;
         this.executorService = executorService;
         this.shutdownExecutorService = shutdownExecutorService;
+        this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
index b063304..b9a9187 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
@@ -144,6 +144,32 @@ public interface ShutdownStrategy extends StaticService {
     TimeUnit getTimeUnit();
 
     /**
+     * Whether Camel should try to suppress logging during shutdown and timeout was triggered,
+     * meaning forced shutdown is happening. And during forced shutdown we want to avoid
logging
+     * errors/warnings et all in the logs as a side-effect of the forced timeout.
+     * <p/>
+     * By default this is <tt>false</tt>
+     * <p/>
+     * Notice the suppress is a <i>best effort</i> as there may still be some
logs coming
+     * from 3rd party libraries and whatnot, which Camel cannot control.
+     *
+     * @param suppressLoggingOnTimeout <tt>true</tt> to suppress logging, false
to log as usual.
+     */
+    void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout);
+
+    /**
+     * Whether Camel should try to suppress logging during shutdown and timeout was triggered,
+     * meaning forced shutdown is happening. And during forced shutdown we want to avoid
logging
+     * errors/warnings et all in the logs as a side-effect of the forced timeout.
+     * <p/>
+     * By default this is <tt>false</tt>
+     * <p/>
+     * Notice the suppress is a <i>best effort</i> as there may still be some
logs coming
+     * from 3rd party libraries and whatnot, which Camel cannot control.
+     */
+    boolean isSuppressLoggingOnTimeout();
+
+    /**
      * Sets whether to force shutdown of all consumers when a timeout occurred and thus
      * not all consumers was shutdown within that period.
      * <p/>
@@ -191,4 +217,9 @@ public interface ShutdownStrategy extends StaticService {
      */
     boolean forceShutdown(Service service);
 
+    /**
+     * Whether a timeout has occurred during a shutdown.
+     */
+    boolean hasTimeoutOccurred();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java
b/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java
new file mode 100644
index 0000000..9c77862
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class ShutdownStrategySuppressLoggingOnTimeoutTest extends ContextTestSupport {
+
+    public void testSuppressLogging() throws Exception {
+        context.getShutdownStrategy().setTimeout(1);
+        context.getShutdownStrategy().setSuppressLoggingOnTimeout(true);
+
+        template.sendBody("seda:foo", "Hello World");
+
+        Thread.sleep(2000);
+
+        context.stop();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo")
+                    .delay(8000)
+                    .to("log:out");
+            }
+        };
+    }
+}


Mime
View raw message