camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1066758 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/main/j...
Date Thu, 03 Feb 2011 08:38:17 GMT
Author: davsclaus
Date: Thu Feb  3 08:38:16 2011
New Revision: 1066758

URL: http://svn.apache.org/viewvc?rev=1066758&view=rev
Log:
CAMEL-3610: Camel now supports MDC logging.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MDCUnitOfWork.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCTest.java
      - copied, changed from r1066353, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncMDCTest.java
      - copied, changed from r1066353, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/SpringMDCTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringMDCTest.xml
      - copied, changed from r1066732, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/defaultJmxConfig.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultUnitOfWorkTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
    camel/trunk/camel-core/src/test/resources/log4j.properties
    camel/trunk/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java
    camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
    camel/trunk/components/camel-spring/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Thu Feb  3 08:38:16 2011
@@ -968,4 +968,18 @@ public interface CamelContext extends Su
      */
     void setLazyLoadTypeConverters(Boolean lazyLoadTypeConverters);
 
+    /**
+     * Whether or not <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a> logging is being enabled.
+     *
+     * @return <tt>true</tt> if MDC logging is enabled
+     */
+    Boolean isUseMDCLogging();
+
+    /**
+     * Set whether <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a> is enabled.
+     *
+     * @param useMDCLogging <tt>true</tt> to enable MDC logging, <tt>false</tt> to disable
+     */
+    void setUseMDCLogging(Boolean useMDCLogging);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Thu Feb  3 08:38:16 2011
@@ -166,6 +166,7 @@ public class DefaultCamelContext extends
     private Boolean handleFault = Boolean.FALSE;
     private Boolean disableJMX = Boolean.FALSE;
     private Boolean lazyLoadTypeConverters = Boolean.FALSE;
+    private Boolean useMDCLogging = Boolean.FALSE;
     private Long delay;
     private ErrorHandlerBuilder errorHandlerBuilder;
     private Map<String, DataFormatDefinition> dataFormats = new HashMap<String, DataFormatDefinition>();
@@ -1341,6 +1342,11 @@ public class DefaultCamelContext extends
             log.info("Tracing is enabled on CamelContext: " + getName());
         }
 
+        if (isUseMDCLogging()) {
+            // log if MDC has been enabled
+            log.info("MDC logging is enabled on CamelContext: " + getName());
+        }
+
         if (isHandleFault()) {
             // only add a new handle fault if not already configured
             if (HandleFault.getHandleFault(this) == null) {
@@ -2144,6 +2150,14 @@ public class DefaultCamelContext extends
         this.lazyLoadTypeConverters = lazyLoadTypeConverters;
     }
 
+    public Boolean isUseMDCLogging() {
+        return useMDCLogging != null && useMDCLogging;
+    }
+
+    public void setUseMDCLogging(Boolean useMDCLogging) {
+        this.useMDCLogging = useMDCLogging;
+    }
+
     public ClassLoader getApplicationContextClassLoader() {
         return applicationContextClassLoader;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Thu Feb  3 08:38:16 2011
@@ -24,9 +24,11 @@ import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.Synchronization;
@@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * @version $Revision$
  */
 public class DefaultUnitOfWork implements UnitOfWork, Service {
-    private static final transient Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class);
+    protected final transient Logger log = LoggerFactory.getLogger(getClass());
 
     private String id;
     private CamelContext context;
@@ -55,8 +57,8 @@ public class DefaultUnitOfWork implement
     private final Stack<RouteContext> routeContextStack = new Stack<RouteContext>();
 
     public DefaultUnitOfWork(Exchange exchange) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("UnitOfWork created for ExchangeId: " + exchange.getExchangeId() + " with " + exchange);
+        if (log.isTraceEnabled()) {
+            log.trace("UnitOfWork created for ExchangeId: " + exchange.getExchangeId() + " with " + exchange);
         }
         tracedRouteNodes = new DefaultTracedRouteNodes();
         context = exchange.getContext();
@@ -113,8 +115,8 @@ public class DefaultUnitOfWork implement
         if (synchronizations == null) {
             synchronizations = new ArrayList<Synchronization>();
         }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Adding synchronization " + synchronization);
+        if (log.isTraceEnabled()) {
+            log.trace("Adding synchronization " + synchronization);
         }
         synchronizations.add(synchronization);
     }
@@ -141,29 +143,29 @@ public class DefaultUnitOfWork implement
             }
 
             if (handover) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Handover synchronization " + synchronization + " to: " + target);
+                if (log.isTraceEnabled()) {
+                    log.trace("Handover synchronization " + synchronization + " to: " + target);
                 }
                 target.addOnCompletion(synchronization);
                 // remove it if its handed over
                 it.remove();
             } else {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Handover not allow for synchronization " + synchronization);
+                if (log.isTraceEnabled()) {
+                    log.trace("Handover not allow for synchronization " + synchronization);
                 }
             }
         }
     }
    
     public void done(Exchange exchange) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("UnitOfWork done for ExchangeId: " + exchange.getExchangeId() + " with " + exchange);
+        if (log.isTraceEnabled()) {
+            log.trace("UnitOfWork done for ExchangeId: " + exchange.getExchangeId() + " with " + exchange);
         }
 
         boolean failed = exchange.isFailed();
 
         // at first done the synchronizations
-        UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
+        UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log);
 
         // then fire event to signal the exchange is done
         try {
@@ -174,7 +176,7 @@ public class DefaultUnitOfWork implement
             }
         } catch (Throwable e) {
             // must catch exceptions to ensure synchronizations is also invoked
-            LOG.warn("Exception occurred during event notification. This exception will be ignored.", e);
+            log.warn("Exception occurred during event notification. This exception will be ignored.", e);
         } finally {
             // unregister from inflight registry
             if (exchange.getContext() != null) {
@@ -202,16 +204,16 @@ public class DefaultUnitOfWork implement
         return transactedBy != null && !transactedBy.isEmpty();
     }
 
-    public boolean isTransactedBy(Object transactionDefinition) {
-        return getTransactedBy().contains(transactionDefinition);
+    public boolean isTransactedBy(Object key) {
+        return getTransactedBy().contains(key);
     }
 
-    public void beginTransactedBy(Object transactionDefinition) {
-        getTransactedBy().add(transactionDefinition);
+    public void beginTransactedBy(Object key) {
+        getTransactedBy().add(key);
     }
 
-    public void endTransactedBy(Object transactionDefinition) {
-        getTransactedBy().remove(transactionDefinition);
+    public void endTransactedBy(Object key) {
+        getTransactedBy().remove(key);
     }
 
     public RouteContext getRouteContext() {
@@ -232,10 +234,24 @@ public class DefaultUnitOfWork implement
         return routeContextStack.pop();
     }
 
+    public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
+        // no wrapping needed
+        return callback;
+    }
+
+    public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
+        // noop
+    }
+
     private Set<Object> getTransactedBy() {
         if (transactedBy == null) {
             transactedBy = new LinkedHashSet<Object>();
         }
         return transactedBy;
     }
+
+    @Override
+    public String toString() {
+        return "DefaultUnitOfWork";
+    }
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MDCUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MDCUnitOfWork.java?rev=1066758&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MDCUnitOfWork.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MDCUnitOfWork.java Thu Feb  3 08:38:16 2011
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.RouteContext;
+import org.slf4j.MDC;
+
+/**
+ * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>.
+ *
+ * @version $Revision$
+ */
+public class MDCUnitOfWork extends DefaultUnitOfWork {
+
+    public static final String MDC_EXCHANGE_ID = "exchangeId";
+    public static final String MDC_CORRELATION_ID = "correlationId";
+    public static final String MDC_ROUTE_ID = "routeId";
+
+    public MDCUnitOfWork(Exchange exchange) {
+        super(exchange);
+        // must add exchange id in constructor
+        MDC.put(MDC_EXCHANGE_ID, exchange.getExchangeId());
+        // and add optional correlation id
+        String corrId = exchange.getProperty(Exchange.CORRELATION_ID, String.class);
+        if (corrId != null) {
+            MDC.put(MDC_CORRELATION_ID, corrId);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+        // and remove when stopping
+        clear();
+    }
+
+    @Override
+    public void pushRouteContext(RouteContext routeContext) {
+        super.pushRouteContext(routeContext);
+        MDC.put(MDC_ROUTE_ID, routeContext.getRoute().getId());
+    }
+
+    @Override
+    public RouteContext popRouteContext() {
+        MDC.remove(MDC_ROUTE_ID);
+        return super.popRouteContext();
+    }
+
+    @Override
+    public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
+        return new MDCCallback(callback);
+    }
+
+    @Override
+    public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
+        if (!doneSync) {
+            // must clear MDC on current thread as the exchange is being processed asynchronously
+            // by another thread
+            clear();
+        }
+    }
+
+    /**
+     * Clears information put on the MDC by this {@link MDCUnitOfWork}
+     */
+    public void clear() {
+        MDC.remove(MDC_EXCHANGE_ID);
+        MDC.remove(MDC_CORRELATION_ID);
+        MDC.remove(MDC_ROUTE_ID);
+    }
+
+    @Override
+    public String toString() {
+        return "MDCUnitOfWork";
+    }
+
+    /**
+     * {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when
+     * the asynchronous routing engine is being used.
+     */
+    private static final class MDCCallback implements AsyncCallback {
+
+        private final AsyncCallback delegate;
+        private final String exchangeId;
+        private final String correlationId;
+        private final String routeId;
+
+        private MDCCallback(AsyncCallback delegate) {
+            this.delegate = delegate;
+            this.exchangeId = MDC.get(MDC_EXCHANGE_ID);
+            this.correlationId = MDC.get(MDC_CORRELATION_ID);
+
+            String routeId = MDC.get(MDC_ROUTE_ID);
+            if (routeId != null) {
+                // intern route id as this reduces memory allocations
+                this.routeId = routeId.intern();
+            } else {
+                this.routeId = null;
+            }
+        }
+
+        public void done(boolean doneSync) {
+            if (!doneSync) {
+                // when done asynchronously then restore information from previous thread
+                if (exchangeId != null) {
+                    MDC.put(MDC_EXCHANGE_ID, exchangeId);
+                }
+                if (correlationId != null) {
+                    MDC.put(MDC_CORRELATION_ID, correlationId);
+                }
+                if (routeId != null) {
+                    MDC.put(MDC_ROUTE_ID, routeId);
+                }
+            }
+            delegate.done(doneSync);
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+    }
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Thu Feb  3 08:38:16 2011
@@ -21,12 +21,12 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.UnitOfWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
-
 /**
  * Ensures the {@link Exchange} is routed under the boundaries of an {@link org.apache.camel.spi.UnitOfWork}.
  * <p/>
@@ -85,9 +85,16 @@ public final class UnitOfWorkProcessor e
         }
 
         if (exchange.getUnitOfWork() == null) {
+            UnitOfWork unitOfWork;
             // If there is no existing UoW, then we should start one and
             // terminate it once processing is completed for the exchange.
-            final DefaultUnitOfWork uow = new DefaultUnitOfWork(exchange);
+            if (exchange.getContext().isUseMDCLogging()) {
+                unitOfWork = new MDCUnitOfWork(exchange);
+            } else {
+                unitOfWork = new DefaultUnitOfWork(exchange);
+            }
+            final UnitOfWork uow = unitOfWork;
+
             exchange.setUnitOfWork(uow);
             try {
                 uow.start();
@@ -133,7 +140,7 @@ public final class UnitOfWorkProcessor e
         }
     }
 
-    private void doneUow(DefaultUnitOfWork uow, Exchange exchange) {
+    private void doneUow(UnitOfWork uow, Exchange exchange) {
         // unit of work is done
         try {
             if (exchange.getUnitOfWork() != null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java Thu Feb  3 08:38:16 2011
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
 
 /**
  * An object representing the unit of work processing an {@link Exchange}
@@ -26,7 +29,7 @@ import org.apache.camel.Message;
  *
  * @version $Revision$
  */
-public interface UnitOfWork {
+public interface UnitOfWork extends Service {
 
     /**
      * Adds a synchronization hook
@@ -89,34 +92,28 @@ public interface UnitOfWork {
     boolean isTransacted();
 
     /**
-     * Are we already transacted by the given transaction definition
-     * <p/>
-     * The definition will most likely be a Spring TransactionTemplate when using Spring Transaction
+     * Are we already transacted by the given transaction key?
      *
-     * @param transactionDefinition the transaction definition
+     * @param key the transaction key
      * @return <tt>true</tt> if already, <tt>false</tt> otherwise
      */
-    boolean isTransactedBy(Object transactionDefinition);
+    boolean isTransactedBy(Object key);
 
     /**
-     * Mark this UnitOfWork as being transacted by the given transaction definition.
-     * <p/>
-     * The definition will most likely be a Spring TransactionTemplate when using Spring Transaction
+     * Mark this UnitOfWork as being transacted by the given transaction key.
      * <p/>
-     * When the transaction is completed then invoke the {@link #endTransactedBy(Object)} method.
+     * When the transaction is completed then invoke the {@link #endTransactedBy(Object)} method using the same key.
      *
-     * @param transactionDefinition the transaction definition
+     * @param key the transaction key
      */
-    void beginTransactedBy(Object transactionDefinition);
+    void beginTransactedBy(Object key);
 
     /**
      * Mark this UnitOfWork as not transacted anymore by the given transaction definition.
-     * <p/>
-     * The definition will most likely be a Spring TransactionTemplate when using Spring Transaction
      *
-     * @param transactionDefinition the transaction definition
+     * @param key the transaction key
      */
-    void endTransactedBy(Object transactionDefinition);
+    void endTransactedBy(Object key);
 
     /**
      * Gets the {@link RouteContext} that this {@link UnitOfWork} currently is being routed through.
@@ -147,4 +144,28 @@ public interface UnitOfWork {
      * @return the route context or <tt>null</tt> if none existed
      */
     RouteContext popRouteContext();
+
+    /**
+     * Strategy for optional work to be execute before processing
+     * <p/>
+     * For example the {@link org.apache.camel.impl.MDCUnitOfWork} leverages this
+     * to ensure MDC is handled correctly during routing exchanges using the
+     * asynchronous routing engine.
+     *
+     * @param processor the processor to be executed
+     * @param exchange  the current exchange
+     * @param callback the callback
+     * @return the callback to be used (can be wrapped)
+     */
+    AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Strategy for optional work to be executed after the callback has been processed.
+     *
+     * @param processor the processor executed
+     * @param exchange  the current exchange
+     * @param callback  the callback used
+     * @param doneSync  whether the process was done synchronously or asynchronously
+     */
+    void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync);
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Thu Feb  3 08:38:16 2011
@@ -66,8 +66,20 @@ public final class AsyncProcessorHelper 
             callback.done(true);
             sync = true;
         } else {
+            // allow unit of work to wrap callback in case it need to do some special work
+            // for example the MDCUnitOfWork
+            AsyncCallback async = callback;
+            if (exchange.getUnitOfWork() != null) {
+                async = exchange.getUnitOfWork().beforeProcess(processor, exchange, callback);
+            }
+
             // we support asynchronous routing so invoke it
-            sync = processor.process(exchange, callback);
+            sync = processor.process(exchange, async);
+
+            // execute any after processor work
+            if (exchange.getUnitOfWork() != null) {
+                exchange.getUnitOfWork().afterProcess(processor, exchange, callback, sync);
+            }
         }
 
         if (LOG.isTraceEnabled()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Thu Feb  3 08:38:16 2011
@@ -33,6 +33,8 @@ import org.apache.camel.model.ExecutorSe
 import org.apache.camel.spi.ExecutorServiceStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that
@@ -49,6 +51,7 @@ import org.apache.camel.util.ObjectHelpe
 public final class ExecutorServiceHelper {
 
     public static final String DEFAULT_PATTERN = "Camel Thread ${counter} - ${name}";
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceHelper.class);
     private static AtomicLong threadCounter = new AtomicLong();
 
     private ExecutorServiceHelper() {
@@ -103,13 +106,7 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ScheduledExecutorService newScheduledThreadPool(final int poolSize, final String pattern, final String name, final boolean daemon) {
-        return Executors.newScheduledThreadPool(poolSize, new ThreadFactory() {
-            public Thread newThread(Runnable r) {
-                Thread answer = new Thread(r, getThreadName(pattern, name));
-                answer.setDaemon(daemon);
-                return answer;
-            }
-        });
+        return Executors.newScheduledThreadPool(poolSize, new CamelThreadFactory(pattern, name, daemon));
     }
 
     /**
@@ -124,13 +121,7 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ExecutorService newFixedThreadPool(final int poolSize, final String pattern, final String name, final boolean daemon) {
-        return Executors.newFixedThreadPool(poolSize, new ThreadFactory() {
-            public Thread newThread(Runnable r) {
-                Thread answer = new Thread(r, getThreadName(pattern, name));
-                answer.setDaemon(daemon);
-                return answer;
-            }
-        });
+        return Executors.newFixedThreadPool(poolSize, new CamelThreadFactory(pattern, name, daemon));
     }
 
     /**
@@ -142,13 +133,7 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ExecutorService newSingleThreadExecutor(final String pattern, final String name, final boolean daemon) {
-        return Executors.newSingleThreadExecutor(new ThreadFactory() {
-            public Thread newThread(Runnable r) {
-                Thread answer = new Thread(r, getThreadName(pattern, name));
-                answer.setDaemon(daemon);
-                return answer;
-            }
-        });
+        return Executors.newSingleThreadExecutor(new CamelThreadFactory(pattern, name, daemon));
     }
 
     /**
@@ -162,13 +147,7 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ExecutorService newCachedThreadPool(final String pattern, final String name, final boolean daemon) {
-        return Executors.newCachedThreadPool(new ThreadFactory() {
-            public Thread newThread(Runnable r) {
-                Thread answer = new Thread(r, getThreadName(pattern, name));
-                answer.setDaemon(daemon);
-                return answer;
-            }
-        });
+        return Executors.newCachedThreadPool(new CamelThreadFactory(pattern, name, daemon));
     }
 
     /**
@@ -251,13 +230,7 @@ public final class ExecutorServiceHelper
             queue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
         }
         ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue);
-        answer.setThreadFactory(new ThreadFactory() {
-            public Thread newThread(Runnable r) {
-                Thread answer = new Thread(r, getThreadName(pattern, name));
-                answer.setDaemon(daemon);
-                return answer;
-            }
-        });
+        answer.setThreadFactory(new CamelThreadFactory(pattern, name, daemon));
         if (rejectedExecutionHandler == null) {
             rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
         }
@@ -362,4 +335,31 @@ public final class ExecutorServiceHelper
         }
     }
 
+    /**
+     * Thread factory which creates threads supporting a naming pattern.
+     */
+    private static final class CamelThreadFactory implements ThreadFactory {
+
+        private final String pattern;
+        private final String name;
+        private final boolean daemon;
+
+        private CamelThreadFactory(String pattern, String name, boolean daemon) {
+            this.pattern = pattern;
+            this.name = name;
+            this.daemon = daemon;
+        }
+
+        public Thread newThread(Runnable runnable) {
+            String threadName = getThreadName(pattern, name);
+            Thread answer = new Thread(runnable, threadName);
+            answer.setDaemon(daemon);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Created thread[" + name + "]: " + answer);
+            }
+            return answer;
+        }
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultUnitOfWorkTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultUnitOfWorkTest.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultUnitOfWorkTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultUnitOfWorkTest.java Thu Feb  3 08:38:16 2011
@@ -32,7 +32,8 @@ public class DefaultUnitOfWorkTest exten
     }
 
     public void testGetId() {
-        assertEquals("2", unitOfWork.getId());
-        assertEquals("2", unitOfWork.getId());
+        String id = unitOfWork.getId();
+        assertNotNull(id);
+        assertEquals(id, unitOfWork.getId());
     }
 }
\ No newline at end of file

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCTest.java (from r1066353, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java&r1=1066353&r2=1066758&rev=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCTest.java Thu Feb  3 08:38:16 2011
@@ -17,29 +17,32 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.slf4j.MDC;
 
 /**
  * @version $Revision$
  */
-public class SimpleMockTest extends ContextTestSupport {
+public class MDCTest extends ContextTestSupport {
 
-    public void testSimple() throws Exception {
+    public void testMDC() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");
 
-        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:a", "Hello World");
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testSimpleTwoMessages() throws Exception {
+    public void testMDCTwoMessages() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World", "Bye World");
 
-        template.sendBody("direct:start", "Hello World");
-        template.sendBody("direct:start", "Bye World");
+        template.sendBody("direct:a", "Hello World");
+        template.sendBody("direct:a", "Bye World");
 
         assertMockEndpointsSatisfied();
     }
@@ -49,7 +52,26 @@ public class SimpleMockTest extends Cont
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("log:foo").to("log:bar").to("mock:result");
+                // enable MDC
+                context.setUseMDCLogging(true);
+
+                from("direct:a").routeId("route-a")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                assertEquals("route-a", MDC.get("routeId"));
+                                assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                            }
+                        })
+                        .to("log:foo").to("direct:b");
+
+                from("direct:b").routeId("route-b")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                assertEquals("route-b", MDC.get("routeId"));
+                                assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                            }
+                        })
+                        .to("log:bar").to("mock:result");
             }
         };
     }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java?rev=1066758&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java Thu Feb  3 08:38:16 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.slf4j.MDC;
+
+/**
+ * @version $Revision: 807500 $
+ */
+public class MDCWireTapTest extends ContextTestSupport {
+
+    public void testMDC() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+
+        template.sendBody("direct:a", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable MDC
+                context.setUseMDCLogging(true);
+
+                from("direct:a").routeId("route-a")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            assertEquals("route-a", MDC.get("routeId"));
+                            assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                        }
+                    })
+                    .to("log:before-wiretap")
+                    .wireTap("direct:b")
+                    .delay(2000)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                assertEquals("route-a", MDC.get("routeId"));
+                                assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                            }
+                        })
+                    .to("log:a-done")
+                    .to("mock:a");
+
+                from("direct:b").routeId("route-b")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                assertEquals("route-b", MDC.get("routeId"));
+                                assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                            }
+                        })
+                    .to("log:b-done").to("mock:b");
+            }
+        };
+    }
+}

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncMDCTest.java (from r1066353, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncMDCTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncMDCTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java&r1=1066353&r2=1066758&rev=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncMDCTest.java Thu Feb  3 08:38:16 2011
@@ -20,26 +20,37 @@ import org.apache.camel.ContextTestSuppo
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.slf4j.MDC;
 
 /**
  * @version $Revision$
  */
-public class AsyncEndpointCBRTest extends ContextTestSupport {
+public class AsyncMDCTest extends ContextTestSupport {
 
-    private static String beforeThreadName;
-    private static String afterThreadName;
+    public void testMDC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye Camel");
 
-    public void testAsyncEndpoint() throws Exception {
-        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
-        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
-        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
-
-        String reply = template.requestBody("direct:start", "Hello Camel", String.class);
-        assertEquals("Bye Camel", reply);
+        template.sendBody("direct:a", "Hello World");
 
         assertMockEndpointsSatisfied();
+    }
+
+    public void testThreeMessagesMDC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye Camel", "Bye Camel", "Bye Camel");
+
+        log.info("#1 message");
+        template.sendBody("direct:a", "Hello World");
 
-        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        log.info("#2 message");
+        template.sendBody("direct:a", "Hello Camel");
+
+        log.info("#3 message");
+        template.sendBody("direct:a", "Hi Camel");
+
+        assertMockEndpointsSatisfied();
     }
 
     @Override
@@ -47,28 +58,31 @@ public class AsyncEndpointCBRTest extend
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                // enable MDC
+                context.setUseMDCLogging(true);
+
                 context.addComponent("async", new MyAsyncComponent());
 
-                from("direct:start")
-                        .to("mock:before")
+                from("direct:a").routeId("route-a")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                assertEquals("route-a", MDC.get("routeId"));
+                                assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                            }
+                        })
                         .to("log:before")
-                        .choice()
-                            .when(body().contains("Camel"))
-                                .process(new Processor() {
-                                    public void process(Exchange exchange) throws Exception {
-                                        beforeThreadName = Thread.currentThread().getName();
-                                    }
-                                })
-                                .to("async:Bye Camel")
-                                .process(new Processor() {
-                                    public void process(Exchange exchange) throws Exception {
-                                        afterThreadName = Thread.currentThread().getName();
-                                    }
-                                })
-                                .to("log:after")
-                                .to("mock:after")
-                            .end()
-                        .to("mock:result");
+                        .to("async:Bye Camel")
+                        .to("log:after")
+                        .to("direct:b");
+
+                from("direct:b").routeId("route-b")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                assertEquals("route-b", MDC.get("routeId"));
+                                assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+                            }
+                        })
+                        .to("log:bar").to("mock:result");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java Thu Feb  3 08:38:16 2011
@@ -34,11 +34,12 @@ import org.slf4j.LoggerFactory;
 public class MyAsyncProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(MyAsyncProducer.class);
-    private final ExecutorService executor = Executors.newCachedThreadPool();
+    private final ExecutorService executor;
     private final AtomicInteger counter = new AtomicInteger();
 
     public MyAsyncProducer(MyAsyncEndpoint endpoint) {
         super(endpoint);
+        this.executor = endpoint.getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "MyProducer");
     }
 
     public MyAsyncEndpoint getEndpoint() {
@@ -48,6 +49,7 @@ public class MyAsyncProducer extends Def
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         executor.submit(new Callable<Object>() {
             public Object call() throws Exception {
+
                 LOG.info("Simulating a task which takes " + getEndpoint().getDelay() + " millis to reply");
                 Thread.sleep(getEndpoint().getDelay());
 

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Thu Feb  3 08:38:16 2011
@@ -53,12 +53,16 @@ log4j.logger.org.apache.camel.impl.Defau
 log4j.appender.out=org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
 log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+# MDC
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %-10.10X{exchangeId} - %-10.10X{correlationId} - %-10.10X{routeId} - %m%n
 
 # File appender
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 log4j.appender.file.file=target/camel-core-test.log
 log4j.appender.file.append=true
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+# MDC
+#log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %-10.10X{exchangeId} - %-10.10X{correlationId} - %-10.10X{routeId} - %m%n
 
 log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer
\ No newline at end of file

Modified: camel/trunk/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java (original)
+++ camel/trunk/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java Thu Feb  3 08:38:16 2011
@@ -88,6 +88,8 @@ public class CamelContextFactoryBean ext
     @XmlAttribute(required = false)
     private String autoStartup = "true";
     @XmlAttribute(required = false)
+    private String useMDCLogging;
+    @XmlAttribute(required = false)
     private ShutdownRoute shutdownRoute;
     @XmlAttribute(required = false)
     private ShutdownRunningTask shutdownRunningTask;
@@ -248,6 +250,14 @@ public class CamelContextFactoryBean ext
         this.autoStartup = autoStartup;
     }
 
+    public String getUseMDCLogging() {
+        return useMDCLogging;
+    }
+
+    public void setUseMDCLogging(String useMDCLogging) {
+        this.useMDCLogging = useMDCLogging;
+    }
+
     public Boolean getLazyLoadTypeConverters() {
         return lazyLoadTypeConverters;
     }

Modified: camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java (original)
+++ camel/trunk/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java Thu Feb  3 08:38:16 2011
@@ -41,7 +41,6 @@ import org.apache.camel.management.Defau
 import org.apache.camel.management.DefaultManagementStrategy;
 import org.apache.camel.management.ManagedManagementStrategy;
 import org.apache.camel.model.ContextScanDefinition;
-import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.IdentifiedType;
 import org.apache.camel.model.InterceptDefinition;
 import org.apache.camel.model.InterceptFromDefinition;
@@ -49,14 +48,12 @@ import org.apache.camel.model.InterceptS
 import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.model.PackageScanDefinition;
-import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteBuilderDefinition;
 import org.apache.camel.model.RouteContainer;
 import org.apache.camel.model.RouteContextRefDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteDefinitionHelper;
 import org.apache.camel.model.ThreadPoolProfileDefinition;
-import org.apache.camel.model.TransactedDefinition;
 import org.apache.camel.model.config.PropertiesDefinition;
 import org.apache.camel.model.dataformat.DataFormatsDefinition;
 import org.apache.camel.processor.interceptor.Delayer;
@@ -82,7 +79,6 @@ import org.apache.camel.spi.ShutdownStra
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -442,6 +438,8 @@ public abstract class AbstractCamelConte
 
     public abstract String getAutoStartup();
 
+    public abstract String getUseMDCLogging();
+
     public abstract Boolean getLazyLoadTypeConverters();
 
     public abstract CamelJMXAgentDefinition getCamelJMXAgent();
@@ -494,6 +492,9 @@ public abstract class AbstractCamelConte
         if (getAutoStartup() != null) {
             ctx.setAutoStartup(CamelContextHelper.parseBoolean(getContext(), getAutoStartup()));
         }
+        if (getUseMDCLogging() != null) {
+            ctx.setUseMDCLogging(CamelContextHelper.parseBoolean(getContext(), getUseMDCLogging()));
+        }
         if (getShutdownRoute() != null) {
             ctx.setShutdownRoute(getShutdownRoute());
         }

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Thu Feb  3 08:38:16 2011
@@ -36,7 +36,6 @@ import org.apache.camel.core.xml.CamelJM
 import org.apache.camel.core.xml.CamelPropertyPlaceholderDefinition;
 import org.apache.camel.core.xml.CamelProxyFactoryDefinition;
 import org.apache.camel.core.xml.CamelServiceExporterDefinition;
-import org.apache.camel.impl.DefaultCamelContextNameStrategy;
 import org.apache.camel.model.ContextScanDefinition;
 import org.apache.camel.model.InterceptDefinition;
 import org.apache.camel.model.InterceptFromDefinition;
@@ -52,7 +51,6 @@ import org.apache.camel.model.config.Pro
 import org.apache.camel.model.dataformat.DataFormatsDefinition;
 import org.apache.camel.spi.PackageScanFilter;
 import org.apache.camel.spi.Registry;
-import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
@@ -97,6 +95,8 @@ public class CamelContextFactoryBean ext
     @XmlAttribute(required = false)
     private String autoStartup;
     @XmlAttribute(required = false)
+    private String useMDCLogging;
+    @XmlAttribute(required = false)
     private ShutdownRoute shutdownRoute;
     @XmlAttribute(required = false)
     private ShutdownRunningTask shutdownRunningTask;
@@ -443,6 +443,14 @@ public class CamelContextFactoryBean ext
         this.autoStartup = autoStartup;
     }
 
+    public String getUseMDCLogging() {
+        return useMDCLogging;
+    }
+
+    public void setUseMDCLogging(String useMDCLogging) {
+        this.useMDCLogging = useMDCLogging;
+    }
+
     public Boolean getLazyLoadTypeConverters() {
         return lazyLoadTypeConverters;
     }

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java Thu Feb  3 08:38:16 2011
@@ -40,6 +40,7 @@ import org.springframework.transaction.s
 public class TransactionErrorHandler extends RedeliveryErrorHandler {
 
     private final TransactionTemplate transactionTemplate;
+    private final String transactionKey;
 
     /**
      * Creates the transaction error handler.
@@ -60,6 +61,7 @@ public class TransactionErrorHandler ext
         super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, handledPolicy, null, null, false, retryWhile);
         setExceptionPolicy(exceptionPolicyStrategy);
         this.transactionTemplate = transactionTemplate;
+        this.transactionKey = ObjectHelper.getIdentityHashCode(transactionTemplate);
     }
 
     public boolean supportTransacted() {
@@ -81,7 +83,7 @@ public class TransactionErrorHandler ext
     public void process(Exchange exchange) throws Exception {
         // we have to run this synchronously as Spring Transaction does *not* support
         // using multiple threads to span a transaction
-        if (exchange.getUnitOfWork().isTransactedBy(transactionTemplate)) {
+        if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
             // already transacted by this transaction template
             // so lets just let the error handler process it
             processByErrorHandler(exchange);
@@ -108,31 +110,30 @@ public class TransactionErrorHandler ext
     }
 
     protected void processInTransaction(final Exchange exchange) throws Exception {
-        String id = ObjectHelper.getIdentityHashCode(transactionTemplate);
         try {
             // mark the beginning of this transaction boundary
-            exchange.getUnitOfWork().beginTransactedBy(transactionTemplate);
+            exchange.getUnitOfWork().beginTransactedBy(transactionKey);
 
             if (log.isDebugEnabled()) {
-                log.debug("Transaction begin (" + id + ") for ExchangeId: " + exchange.getExchangeId());
+                log.debug("Transaction begin (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId());
             }
 
             doInTransactionTemplate(exchange);
 
             if (log.isDebugEnabled()) {
-                log.debug("Transaction commit (" + id + ") for ExchangeId: " + exchange.getExchangeId());
+                log.debug("Transaction commit (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId());
             }
         } catch (TransactionRollbackException e) {
             // ignore as its just a dummy exception to force spring TX to rollback
             if (log.isDebugEnabled()) {
-                log.debug("Transaction rollback (" + id + ") for ExchangeId: " + exchange.getExchangeId() + " due exchange was marked for rollbackOnly");
+                log.debug("Transaction rollback (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId() + " due exchange was marked for rollbackOnly");
             }
         } catch (Exception e) {
-            log.warn("Transaction rollback (" + id + ") for ExchangeId: " + exchange.getExchangeId() + " due exception: " + e.getMessage());
+            log.warn("Transaction rollback (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId() + " due exception: " + e.getMessage());
             exchange.setException(e);
         } finally {
             // mark the end of this transaction boundary
-            exchange.getUnitOfWork().endTransactedBy(transactionTemplate);
+            exchange.getUnitOfWork().endTransactedBy(transactionKey);
         }
 
         // if it was a local rollback only then remove its marker so outer transaction wont see the marker
@@ -142,10 +143,10 @@ public class TransactionErrorHandler ext
                 // log exception if there was a cause exception so we have the stacktrace
                 Exception cause = exchange.getException();
                 if (cause != null) {
-                    log.debug("Transaction rollback (" + id + ") for ExchangeId: " + exchange.getExchangeId()
+                    log.debug("Transaction rollback (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId()
                         + " due exchange was marked for rollbackOnlyLast and due exception: ", cause);
                 } else {
-                    log.debug("Transaction rollback (" + id + ") for ExchangeId: " + exchange.getExchangeId()
+                    log.debug("Transaction rollback (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId()
                         + " due exchange was marked for rollbackOnlyLast");
                 }
             }

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/SpringMDCTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/SpringMDCTest.java?rev=1066758&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/SpringMDCTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/SpringMDCTest.java Thu Feb  3 08:38:16 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.slf4j.MDC;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringMDCTest extends SpringTestSupport {
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/spring/SpringMDCTest.xml");
+    }
+
+    public void testMDC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:a", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMDCTwoMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody("direct:a", "Hello World");
+        template.sendBody("direct:a", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public static class ProcessorA implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            assertEquals("route-a", MDC.get("routeId"));
+            assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+        }
+    }
+
+    public static class ProcessorB implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            assertEquals("route-b", MDC.get("routeId"));
+            assertEquals(exchange.getExchangeId(), MDC.get("exchangeId"));
+        }
+    }
+
+}

Modified: camel/trunk/components/camel-spring/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/log4j.properties?rev=1066758&r1=1066757&r2=1066758&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-spring/src/test/resources/log4j.properties Thu Feb  3 08:38:16 2011
@@ -33,8 +33,9 @@ log4j.logger.org.apache.camel.impl.Defau
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
-#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
 log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+# MDC
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %-10.10X{exchangeId} - %-10.10X{correlationId} - %-10.10X{routeId} - %m%n
 
 # File appender
 log4j.appender.file=org.apache.log4j.FileAppender

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringMDCTest.xml (from r1066732, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/defaultJmxConfig.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringMDCTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringMDCTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/defaultJmxConfig.xml&r1=1066732&r2=1066758&rev=1066758&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/defaultJmxConfig.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringMDCTest.xml Thu Feb  3 08:38:16 2011
@@ -22,13 +22,25 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-  <!-- START SNIPPET: example -->
-  <camelContext xmlns="http://camel.apache.org/schema/spring">
-    <route id="foo" autoStartup="false">
-      <from uri="seda:start"/>
-      <to uri="mock:result"/>
-    </route>
-  </camelContext>
-  <!-- END SNIPPET: example -->
+    <bean id="processorA" class="org.apache.camel.spring.SpringMDCTest$ProcessorA"/>
+    <bean id="processorB" class="org.apache.camel.spring.SpringMDCTest$ProcessorB"/>
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring" useMDCLogging="true">
+
+        <route id="route-a">
+            <from uri="direct:a"/>
+            <process ref="processorA"/>
+            <to uri="log:foo"/>
+            <to uri="direct:b"/>
+        </route>
+
+        <route id="route-b">
+            <from uri="direct:b"/>
+            <process ref="processorB"/>
+            <to uri="log:bar"/>
+            <to uri="mock:result"/>
+        </route>
+
+    </camelContext>
 
 </beans>



Mime
View raw message