camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r836215 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: model/InterceptDefinition.java processor/BatchProcessor.java processor/OnCompletionProcessor.java processor/RecipientList.java
Date Sat, 14 Nov 2009 17:10:07 GMT
Author: davsclaus
Date: Sat Nov 14 17:10:06 2009
New Revision: 836215

URL: http://svn.apache.org/viewvc?rev=836215&view=rev
Log:
CAMEL-2170: Ensured using UnitOfWork for EIPs that emit new Exchanges on new routes which
didnt have that beforehand

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java?rev=836215&r1=836214&r2=836215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java Sat
Nov 14 17:10:06 2009
@@ -95,7 +95,7 @@
 
             @Override
             public String toString() {
-                return "intercept[" +  (interceptedTarget != null ? interceptedTarget : output)
+ "]";
+                return "intercept[" + (interceptedTarget != null ? interceptedTarget : output)
+ "]";
             }
         });
 
@@ -108,7 +108,7 @@
     /**
      * Applies this interceptor only if the given predicate is true
      *
-     * @param predicate  the predicate
+     * @param predicate the predicate
      * @return the builder
      */
     public ChoiceDefinition when(Predicate predicate) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=836215&r1=836214&r2=836215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sat
Nov 14 17:10:06 2009
@@ -67,7 +67,9 @@
     public BatchProcessor(Processor processor, Collection<Exchange> collection) {
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(collection, "collection");
-        this.processor = processor;
+
+        // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
+        this.processor = new UnitOfWorkProcessor(processor);
         this.collection = collection;
         this.sender = new BatchSender();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=836215&r1=836214&r2=836215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
Sat Nov 14 17:10:06 2009
@@ -45,7 +45,8 @@
     private Predicate onWhen;
 
     public OnCompletionProcessor(Processor processor, boolean onCompleteOnly, boolean onFailureOnly,
Predicate onWhen) {
-        this.processor = processor;
+        // wrap processor in UnitOfWork so what we send out runs in a UoW
+        this.processor = new UnitOfWorkProcessor(processor);
         this.onCompleteOnly = onCompleteOnly;
         this.onFailureOnly = onFailureOnly;
         this.onWhen = onWhen;
@@ -91,7 +92,7 @@
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Processing onComplete: " + copy);
                         }
-                        processor.process(copy);
+                        doProcess(processor, copy);
                         return copy;
                     }
                 });
@@ -118,8 +119,7 @@
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Processing onFailure: " + copy);
                         }
-
-                        processor.process(copy);
+                        doProcess(processor, copy);
                         return copy;
                     }
                 });
@@ -139,6 +139,21 @@
     }
 
     /**
+     * Processes the exchange by the processors
+     *
+     * @param processor the processor
+     * @param exchange the exchange
+     */
+    protected static void doProcess(Processor processor, Exchange exchange) {
+        try {
+            processor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+    }
+
+
+    /**
      * Prepares the {@link Exchange} to send as onCompletion.
      *
      * @param exchange the current exchange
@@ -175,6 +190,6 @@
     }
 
     public String getTraceLabel() {
-        return "OnCompletion";
+        return "onCompletion";
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=836215&r1=836214&r2=836215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sat
Nov 14 17:10:06 2009
@@ -81,7 +81,9 @@
             Object recipient = iter.next();
             Endpoint endpoint = resolveEndpoint(exchange, recipient);
             Producer producer = getProducerCache(exchange).getProducer(endpoint);
-            processors.add(producer);
+            // wrap in unit of work
+            Processor target = new UnitOfWorkProcessor(producer);
+            processors.add(target);
         }
         MulticastProcessor mp = new MulticastProcessor(processors, new UseLatestAggregationStrategy());
         mp.process(exchange);



Mime
View raw message