camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/3] git commit: CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in progress.
Date Sun, 26 May 2013 14:35:39 GMT
Updated Branches:
  refs/heads/master bd3f409eb -> 4653c9531


CAMEL-6377: Optimized routing engine to reduce stack frames in use during routing. Work in
progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6fbd8a75
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6fbd8a75
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6fbd8a75

Branch: refs/heads/master
Commit: 6fbd8a75af925811c7272eb207bfe555366dd2c8
Parents: bd3f409
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun May 26 15:29:40 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun May 26 15:29:40 2013 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/DefaultExchange.java     |    2 +-
 .../apache/camel/impl/DefaultProducerTemplate.java |   12 +-
 .../apache/camel/model/AggregateDefinition.java    |    1 -
 .../apache/camel/model/MulticastDefinition.java    |    7 +-
 .../org/apache/camel/model/SplitDefinition.java    |    6 +-
 .../camel/processor/CamelInternalProcessor.java    |   15 +
 .../apache/camel/processor/MulticastProcessor.java |    2 +-
 .../org/apache/camel/processor/RecipientList.java  |    4 +-
 .../camel/processor/SubUnitOfWorkProcessor.java    |   84 ------
 .../camel/processor/UnitOfWorkProcessor.java       |  222 ---------------
 10 files changed, 34 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
index 9f1abb2..b755b70 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
@@ -374,7 +374,7 @@ public final class DefaultExchange implements Exchange {
     public void addOnCompletion(Synchronization onCompletion) {
         if (unitOfWork == null) {
             // unit of work not yet registered so we store the on completion temporary
-            // until the unit of work is assigned to this exchange by the UnitOfWorkProcessor
+            // until the unit of work is assigned to this exchange by the unit of work
             if (onCompletions == null) {
                 onCompletions = new ArrayList<Synchronization>();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index d726a4e..bce26e8 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -619,9 +619,9 @@ public class DefaultProducerTemplate extends ServiceSupport implements
ProducerT
                 Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body));
 
                 // invoke callback before returning answer
-                // as it allows callback to be used without UnitOfWorkProcessor invoking
it
+                // as it allows callback to be used without unit of work invoking it
                 // and thus it works directly from a producer template as well, as opposed
-                // to the UnitOfWorkProcessor that is injected in routes
+                // to the unit of work that is injected in routes
                 if (answer.isFailed()) {
                     onCompletion.onFailure(answer);
                 } else {
@@ -647,9 +647,9 @@ public class DefaultProducerTemplate extends ServiceSupport implements
ProducerT
                 send(endpoint, exchange);
 
                 // invoke callback before returning answer
-                // as it allows callback to be used without UnitOfWorkProcessor invoking
it
+                // as it allows callback to be used without unit of work invoking it
                 // and thus it works directly from a producer template as well, as opposed
-                // to the UnitOfWorkProcessor that is injected in routes
+                // to the unit of work that is injected in routes
                 if (exchange.isFailed()) {
                     onCompletion.onFailure(exchange);
                 } else {
@@ -668,9 +668,9 @@ public class DefaultProducerTemplate extends ServiceSupport implements
ProducerT
                 Exchange answer = send(endpoint, processor);
 
                 // invoke callback before returning answer
-                // as it allows callback to be used without UnitOfWorkProcessor invoking
it
+                // as it allows callback to be used without unit of work invoking it
                 // and thus it works directly from a producer template as well, as opposed
-                // to the UnitOfWorkProcessor that is injected in routes
+                // to the unit of work that is injected in routes
                 if (answer.isFailed()) {
                     onCompletion.onFailure(answer);
                 } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 338d2fe..d6fb50e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -35,7 +35,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.CamelInternalProcessor;
-import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 916f18d..834eb96 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -18,7 +18,6 @@ package org.apache.camel.model;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -26,8 +25,8 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Processor;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.MulticastProcessor;
-import org.apache.camel.processor.SubUnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
@@ -231,7 +230,9 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition>
i
                                       threadPool, shutdownThreadPool, isStreaming(), isStopOnException(),
timeout, onPrepare, isShareUnitOfWork());
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of work
-            return new SubUnitOfWorkProcessor(answer);
+            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
+            internalProcessor.addTask(new CamelInternalProcessor.SubUnitOfWorkProcessorTask());
+            return internalProcessor;
         }
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index afc6bd1..71eaa89 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -27,8 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.Splitter;
-import org.apache.camel.processor.SubUnitOfWorkProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
@@ -113,7 +113,9 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
                             timeout, onPrepare, isShareUnitOfWork());
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of work
-            return new SubUnitOfWorkProcessor(answer);
+            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
+            internalProcessor.addTask(new CamelInternalProcessor.SubUnitOfWorkProcessorTask());
+            return internalProcessor;
         }
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 97d0955..e39ac29 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -551,7 +551,22 @@ public final class CamelInternalProcessor extends DelegateAsyncProcessor
{
             return parent.createChildUnitOfWork(exchange);
         }
 
+    }
+
+    public static class SubUnitOfWorkProcessorTask implements CamelInternalProcessorTask<UnitOfWork>
{
 
+        @Override
+        public UnitOfWork before(Exchange exchange) throws Exception {
+            // begin savepoint
+            exchange.getUnitOfWork().beginSubUnitOfWork(exchange);
+            return exchange.getUnitOfWork();
+        }
+
+        @Override
+        public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
+            // end sub unit of work
+            unitOfWork.endSubUnitOfWork(exchange);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f31188e..6549402 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -895,7 +895,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     /**
-     * Strategy to create the {@link UnitOfWorkProcessor} to be used for the sub route
+     * Strategy to create the unit of work to be used for the sub route
      *
      * @param routeContext the route context
      * @param processor    the processor

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 262fbc9..e4008d6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -144,7 +144,9 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor
{
         AsyncProcessor target = rlp;
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of work
-            target = new SubUnitOfWorkProcessor(rlp);
+            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(rlp);
+            internalProcessor.addTask(new CamelInternalProcessor.SubUnitOfWorkProcessorTask());
+            target = internalProcessor;
         }
 
         // now let the multicast process the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/processor/SubUnitOfWorkProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SubUnitOfWorkProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/SubUnitOfWorkProcessor.java
deleted file mode 100644
index 6cfb4dc..0000000
--- a/camel-core/src/main/java/org/apache/camel/processor/SubUnitOfWorkProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.spi.RouteContext;
-
-/**
- * A processor that processes the processor in a {@link org.apache.camel.spi.SubUnitOfWork}
context.
- * <p/>
- * This processor ensures the {@link org.apache.camel.spi.UnitOfWork#beginSubUnitOfWork(org.apache.camel.Exchange)}
- * and {@link org.apache.camel.spi.UnitOfWork#endSubUnitOfWork(org.apache.camel.Exchange)}
is executed.
- *
- * @see org.apache.camel.spi.SubUnitOfWork
- * @see org.apache.camel.spi.SubUnitOfWorkCallback
- * @deprecated is to be removed in the future, as replaced by {@link CamelInternalProcessor}
- */
-@Deprecated
-public class SubUnitOfWorkProcessor extends UnitOfWorkProcessor {
-
-    // See code comment in DefaultUnitOfWork for reasons why this implementation is named
SubUnitOfWorkProcessor
-
-    public SubUnitOfWorkProcessor(Processor processor) {
-        super(processor);
-    }
-
-    public SubUnitOfWorkProcessor(AsyncProcessor processor) {
-        super(processor);
-    }
-
-    public SubUnitOfWorkProcessor(RouteContext routeContext, Processor processor) {
-        super(routeContext, processor);
-    }
-
-    public SubUnitOfWorkProcessor(RouteContext routeContext, AsyncProcessor processor) {
-        super(routeContext, processor);
-    }
-
-    @Override
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        // begin savepoint
-        exchange.getUnitOfWork().beginSubUnitOfWork(exchange);
-        // process the exchange
-        return super.process(exchange, new AsyncCallback() {
-            @Override
-            public void done(boolean doneSync) {
-                try {
-                    // end sub unit of work
-                    exchange.getUnitOfWork().endSubUnitOfWork(exchange);
-                } finally {
-                    // must ensure callback is invoked
-                    callback.done(doneSync);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return "SubUnitOfWorkCallback";
-            }
-        });
-    }
-
-    @Override
-    public String toString() {
-        return "SubUnitOfWorkProcessor[" + processor + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/6fbd8a75/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
deleted file mode 100644
index c04bfa9..0000000
--- a/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultUnitOfWork;
-import org.apache.camel.impl.MDCUnitOfWork;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.UnitOfWork;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Ensures the {@link Exchange} is routed under the boundaries of an {@link org.apache.camel.spi.UnitOfWork}.
- * <p/>
- * Handles calling the {@link org.apache.camel.spi.UnitOfWork#done(org.apache.camel.Exchange)}
method
- * when processing of an {@link Exchange} is complete.
- *
- * @deprecated is to be removed in the future, as replaced by {@link CamelInternalProcessor}
- */
-@Deprecated
-public class UnitOfWorkProcessor extends DelegateAsyncProcessor {
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(UnitOfWorkProcessor.class);
-    private final RouteContext routeContext;
-    private final String routeId;
-
-    public UnitOfWorkProcessor(Processor processor) {
-        this(null, processor);
-    }
-
-    public UnitOfWorkProcessor(AsyncProcessor processor) {
-        this(null, processor);
-    }
-
-    public UnitOfWorkProcessor(RouteContext routeContext, Processor processor) {
-        super(processor);
-        this.routeContext = routeContext;
-        if (routeContext != null) {
-            this.routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
-        } else {
-            this.routeId = null;
-        }
-    }
-
-    public UnitOfWorkProcessor(RouteContext routeContext, AsyncProcessor processor) {
-        super(processor);
-        this.routeContext = routeContext;
-        if (routeContext != null) {
-            this.routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
-        } else {
-            this.routeId = null;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "UnitOfWork(" + processor + ")";
-    }
-
-    public RouteContext getRouteContext() {
-        return routeContext;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        // if a route context has been configured, then wrap the processor with a
-        // RouteContextProcessor to ensure we track the route context properly during
-        // processing of the exchange, but only do this once
-        // TODO: This can possible be removed!
-        if (routeContext != null && (!(processor instanceof RouteContextProcessor)))
{
-            processor = new RouteContextProcessor(routeContext, processor);
-        }
-        super.doStart();
-    }
-
-    @Override
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        // if the exchange doesn't have from route id set, then set it if it originated
-        // from this unit of work
-        if (routeId != null && exchange.getFromRouteId() == null) {
-            exchange.setFromRouteId(routeId);
-        }
-
-        if (exchange.getUnitOfWork() == null) {
-            // If there is no existing UoW, then we should start one and
-            // terminate it once processing is completed for the exchange.
-            final UnitOfWork uow = createUnitOfWork(exchange);
-            exchange.setUnitOfWork(uow);
-            try {
-                uow.start();
-            } catch (Exception e) {
-                callback.done(true);
-                exchange.setException(e);
-                return true;
-            }
-
-            Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
-            if (synchronous != null) {
-                // the exchange signalled to process synchronously
-                return processSync(exchange, callback, uow);
-            } else {
-                return processAsync(exchange, callback, uow);
-            }
-        } else {
-            // There was an existing UoW, so we should just pass through..
-            // so that the guy the initiated the UoW can terminate it.
-            return processor.process(exchange, callback);
-        }
-    }
-
-    protected boolean processSync(final Exchange exchange, final AsyncCallback callback,
final UnitOfWork uow) {
-        LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange);
-
-        // process the exchange synchronously
-        try {
-            AsyncProcessorHelper.process(processor, exchange);
-        } catch (Throwable e) {
-            exchange.setException(e);
-        }
-
-        try {
-            callback.done(true);
-        } finally {
-            doneUow(uow, exchange);
-        }
-
-        return true;
-    }
-
-    protected boolean processAsync(final Exchange exchange, final AsyncCallback callback,
final UnitOfWork uow) {
-        LOG.trace("Processing exchange asynchronously: {}", exchange);
-
-        // process the exchange asynchronously
-        try {
-            return processor.process(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // Order here matters. We need to complete the callbacks
-                    // since they will likely update the exchange with some final results.
-                    try {
-                        callback.done(doneSync);
-                    } finally {
-                        doneUow(uow, exchange);
-                    }
-                }
-            });
-        } catch (Throwable e) {
-            LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(),
e);
-
-            // fallback and catch any exceptions the process may not have caught
-            // we must ensure to done the UoW in all cases and issue done on the callback
-            exchange.setException(e);
-
-            // Order here matters. We need to complete the callbacks
-            // since they will likely update the exchange with some final results.
-            try {
-                callback.done(true);
-            } finally {
-                doneUow(uow, exchange);
-            }
-            return true;
-        }
-    }
-
-    /**
-     * Strategy to create the unit of work for the given exchange.
-     *
-     * @param exchange the exchange
-     * @return the created unit of work
-     */
-    protected UnitOfWork createUnitOfWork(Exchange exchange) {
-        UnitOfWork answer;
-        if (exchange.getContext().isUseMDCLogging()) {
-            answer = new MDCUnitOfWork(exchange);
-        } else {
-            answer = new DefaultUnitOfWork(exchange);
-        }
-        return answer;
-    }
-
-    private void doneUow(UnitOfWork uow, Exchange exchange) {
-        // unit of work is done
-        try {
-            if (uow != null) {
-                uow.done(exchange);
-            }
-        } catch (Throwable e) {
-            LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
-                    + ". This exception will be ignored.", e);
-        }
-        try {
-            if (uow != null) {
-                uow.stop();
-            }
-        } catch (Throwable e) {
-            LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
-                    + ". This exception will be ignored.", e);
-        }
-
-        // remove uow from exchange as its done
-        exchange.setUnitOfWork(null);
-    }
-
-}


Mime
View raw message