camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r581960 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/
Date Thu, 04 Oct 2007 16:23:27 GMT
Author: chirino
Date: Thu Oct  4 09:23:26 2007
New Revision: 581960

URL: http://svn.apache.org/viewvc?rev=581960&view=rev
Log:
Update UnitOfWork processor so that consumers can participate in the UoW

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=581960&r1=581959&r2=581960&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Oct 
4 09:23:26 2007
@@ -172,6 +172,12 @@
     UnitOfWork getUnitOfWork();
 
     /**
+     * Sets the unit of work that this exchange belongs to; which may map to
+     * zero, one or more physical transactions
+     */
+    void setUnitOfWork(UnitOfWork unitOfWork);
+
+    /**
      * Returns the exchange id
      *
      * @return the unique id of the exchange

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=581960&r1=581959&r2=581960&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Thu Oct  4 09:23:26 2007
@@ -253,10 +253,11 @@
     }
 
     public UnitOfWork getUnitOfWork() {
-        if (unitOfWork == null) {
-            unitOfWork = new DefaultUnitOfWork();
-        }
         return unitOfWork;
+    }
+
+    public void setUnitOfWork(UnitOfWork unitOfWork) {
+        this.unitOfWork = unitOfWork;
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=581960&r1=581959&r2=581960&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
Thu Oct  4 09:23:26 2007
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=581960&r1=581959&r2=581960&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
Thu Oct  4 09:23:26 2007
@@ -1,8 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.spi.UnitOfWork;
 
 /** 
  * Handles calling the UnitOfWork.done() method when processing of an exchange
@@ -15,14 +33,25 @@
     }
     
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        return processor.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                // Order here matters. We need to complete the callbacks since
-                // they will likely update the exchange with some final results.
-                callback.done(sync);
-                exchange.getUnitOfWork().done(exchange);
-            }
-        });
+        if (exchange.getUnitOfWork() == null) {
+            // If there is no existing UoW, then we should start one and
+            // terminate it once processing is completed for the exchange.
+            exchange.setUnitOfWork(new DefaultUnitOfWork());
+            return processor.process(exchange, new AsyncCallback() {
+                public void done(boolean sync) {
+                    // Order here matters. We need to complete the callbacks
+                    // since they will likely update the exchange with 
+                    // some final results.
+                    callback.done(sync);
+                    exchange.getUnitOfWork().done(exchange);
+                    exchange.setUnitOfWork(null);
+                }
+            });
+        } else {
+            // There was an existing UoW, so we should just pass through..
+            // so that the guy the initiated the UoW can terminate it.
+            return processor.process(exchange, callback);
+        }
     }
 
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java?rev=581960&r1=581959&r2=581960&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
Thu Oct  4 09:23:26 2007
@@ -81,7 +81,6 @@
         template.send(uri, new Processor() {
             public void process(Exchange exchange) throws Exception {
                 exchange.getIn().setBody("<hello>world!</hello>");
-                exchange.getUnitOfWork().addSynchronization(synchronization);
             }
         });
     }
@@ -93,6 +92,7 @@
                 from("direct:foo").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         log.info("Received: " + exchange);
+                        exchange.getUnitOfWork().addSynchronization(synchronization);
 
                         String name = getName();
                         if (name.equals("testFail")) {



Mime
View raw message