camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r581937 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/impl/converter/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/ test/java/org/a...
Date Thu, 04 Oct 2007 14:49:04 GMT
Author: chirino
Date: Thu Oct  4 07:49:03 2007
New Revision: 581937

URL: http://svn.apache.org/viewvc?rev=581937&view=rev
Log:
Finishing of a better UnitOfWork implementation.  Also added easy to use DelegateAsyncProcessor
base class.


Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
  (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
  (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Thu Oct  4 07:49:03 2007
@@ -253,6 +253,9 @@
     }
 
     public UnitOfWork getUnitOfWork() {
+        if (unitOfWork == null) {
+            unitOfWork = new DefaultUnitOfWork();
+        }
         return unitOfWork;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
Thu Oct  4 07:49:03 2007
@@ -17,15 +17,15 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * The default implementation of {@link UnitOfWork}
  *
@@ -33,12 +33,10 @@
  */
 public class DefaultUnitOfWork implements UnitOfWork {
     private List<Synchronization> synchronizations;
-    private final Exchange exchange;
     private List<AsyncCallback> asyncCallbacks;
     private CountDownLatch latch;
 
-    public DefaultUnitOfWork(Exchange exchange) {
-        this.exchange = exchange;
+    public DefaultUnitOfWork() {
     }
 
     public synchronized void addSynchronization(Synchronization synchronization) {
@@ -54,14 +52,21 @@
         }
     }
 
-
     public void reset() {
 
     }
 
-    public void doneSynchronous() {
-        if (isSynchronous()) {
-            onComplete();
+    public void done(Exchange exchange) {
+        if (synchronizations != null) {
+            boolean failed = exchange.isFailed();
+            for (Synchronization synchronization : synchronizations) {
+                if (failed) {
+                    synchronization.onFailure(exchange);
+                }
+                else {
+                    synchronization.onComplete(exchange);
+                }
+            }
         }
     }
 
@@ -72,10 +77,11 @@
     /**
      * Register some asynchronous processing step
      */
+    /*
     public synchronized AsyncCallback addAsyncStep() {
         AsyncCallback answer = new AsyncCallback() {
             public void done(boolean doneSynchronously) {
-                latch.countDown();    
+                latch.countDown();
             }
         };
         if (latch == null) {
@@ -90,11 +96,5 @@
         asyncCallbacks.add(answer);
         return answer;
     }
-    protected synchronized void onComplete() {
-        if (synchronizations != null) {
-            for (Synchronization synchronization : synchronizations) {
-                synchronization.onComplete(exchange);
-            }
-        }
-    }
+    */
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RouteContext.java
Thu Oct  4 07:49:03 2007
@@ -16,17 +16,24 @@
  */
 package org.apache.camel.impl;
 
-import org.apache.camel.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.NoSuchEndpointException;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.FromType;
 import org.apache.camel.model.ProcessorType;
 import org.apache.camel.model.RouteType;
 import org.apache.camel.processor.Interceptor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.ProceedProcessor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.camel.processor.UnitOfWorkProcessor;
 
 /**
  * The context used to activate new routing rules
@@ -114,7 +121,13 @@
         // single route
         if (!eventDrivenProcessors.isEmpty()) {
             Processor processor = Pipeline.newInstance(eventDrivenProcessors);
-            routes.add(new EventDrivenConsumerRoute(getEndpoint(), processor));
+
+            // lets create the async processor
+            final AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+            Processor unitOfWorkProcessor = new UnitOfWorkProcessor(asyncProcessor);
+
+            routes.add(new EventDrivenConsumerRoute(getEndpoint(), unitOfWorkProcessor));
+            //routes.add(new EventDrivenConsumerRoute(getEndpoint(), processor));
         }
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
Thu Oct  4 07:49:03 2007
@@ -22,6 +22,7 @@
 import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.processor.DelegateProcessor;
 
 /**
  * A simple converter that can convert any Processor to an AsyncProcessor.
@@ -32,11 +33,10 @@
  */
 public class AsyncProcessorTypeConverter implements TypeConverter {
 
-    public static final class ProcessorToAsynProcessorBridge implements AsyncProcessor, Service
{
-        private final Processor processor;
+    public static final class ProcessorToAsynProcessorBridge extends DelegateProcessor implements
AsyncProcessor {
 
         private ProcessorToAsynProcessorBridge(Processor processor) {
-            this.processor = processor;
+            super(processor);
         }
 
         public boolean process(Exchange exchange, AsyncCallback callback) {
@@ -48,22 +48,6 @@
             // false means processing of the exchange asynchronously,
             callback.done(true);
             return true;
-        }
-
-        public void process(Exchange exchange) throws Exception {
-            processor.process(exchange);
-        }
-
-        public void start() throws Exception {
-            if (processor instanceof Service) {
-                ((Service)processor).start();
-            }
-        }
-
-        public void stop() throws Exception {
-            if (processor instanceof Service) {
-                ((Service)processor).stop();
-            }
         }
     }
 

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=581937&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
Thu Oct  4 07:49:03 2007
@@ -0,0 +1,55 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A Delegate pattern which delegates processing to a nested AsyncProcessor which can
+ * be useful for implementation inheritance when writing an {@link Policy}
+ */
+public class DelegateAsyncProcessor extends ServiceSupport implements AsyncProcessor {
+    protected AsyncProcessor processor;
+
+    public DelegateAsyncProcessor() {
+    }
+    public DelegateAsyncProcessor(AsyncProcessor processor) {
+        this.processor = processor;
+    }
+
+    @Override
+    public String toString() {
+        return "Delegate(" + processor + ")";
+    }
+
+    public AsyncProcessor getProcessor() {
+        return processor;
+    }
+
+    public void setProcessor(AsyncProcessor processor) {
+        this.processor = processor;
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(processor);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(processor);
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        return processor.process(exchange, callback);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
Thu Oct  4 07:49:03 2007
@@ -29,7 +29,7 @@
  * @version $Revision: 519941 $
  */
 public class DelegateProcessor extends ServiceSupport implements Processor {
-    private Processor processor;
+    protected Processor processor;
 
     public DelegateProcessor() {
     }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=581937&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
Thu Oct  4 07:49:03 2007
@@ -0,0 +1,28 @@
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+
+/** 
+ * Handles calling the UnitOfWork.done() method when processing of an exchange
+ * is complete.
+ */
+public final class UnitOfWorkProcessor extends DelegateAsyncProcessor {
+
+    public UnitOfWorkProcessor(AsyncProcessor processor) {
+        super(processor);
+    }
+    
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        return processor.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                // Order here matters. We need to complete the callbacks since
+                // they will likely update the exchange with some final results.
+                callback.done(sync);
+                exchange.getUnitOfWork().done(exchange);
+            }
+        });
+    }
+
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java Thu
Oct  4 07:49:03 2007
@@ -41,4 +41,9 @@
      * @param synchronization
      */
     void removeSynchronization(Synchronization synchronization);
+
+    /**
+     * Invoked when this unit of work has been completed, whether it has failed or completed
+     */
+    void done(Exchange exchange);
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Thu Oct
 4 07:49:03 2007
@@ -25,6 +25,8 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.processor.DelegateProcessor;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -294,6 +296,25 @@
     protected void assertStringContains(String text, String containedText) {
         assertNotNull("Text should not be null!", text);
         assertTrue("Text: " + text + " does not contain: " + containedText, text.contains(containedText));
+    }
+
+    /**
+     * If a processor is wrapped with a bunch of DelegateProcessor or DelegateAsyncProcessor
objects
+     * this call will drill through them and return the wrapped Processor.
+     * 
+     * @param processor
+     * @return
+     */
+    protected Processor unwrap(Processor processor) {
+        while( true ) {
+            if( processor instanceof DelegateAsyncProcessor ) {
+                processor = ((DelegateAsyncProcessor)processor).getProcessor();
+            } else if( processor instanceof DelegateProcessor ) {
+                processor = ((DelegateProcessor)processor).getProcessor();
+            } else {
+                return processor;
+            }
+        }
     }
 
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java
Thu Oct  4 07:49:03 2007
@@ -22,6 +22,8 @@
 import org.apache.camel.TestSupport;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
 import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.processor.DelegateProcessor;
 import org.apache.camel.processor.FilterProcessor;
 import org.apache.camel.processor.LoggingErrorHandler;
 import org.apache.camel.processor.RedeliveryPolicy;
@@ -78,7 +80,7 @@
             Endpoint key = route.getEndpoint();
             String endpointUri = key.getEndpointUri();
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class,
route);
-            Processor processor = consumerRoute.getProcessor();
+            Processor processor = unwrap(consumerRoute.getProcessor());
 
             SendProcessor sendProcessor = null;
             if (endpointUri.equals("seda:a")) {
@@ -118,7 +120,7 @@
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
 
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class,
route);
-            Processor processor = consumerRoute.getProcessor();
+            Processor processor = unwrap(consumerRoute.getProcessor());
 
             assertIsInstanceOf(DeadLetterChannel.class, processor);
         }
@@ -171,7 +173,7 @@
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class,
route);
-            Processor processor = consumerRoute.getProcessor();
+            Processor processor = unwrap(consumerRoute.getProcessor());
 
             LoggingErrorHandler loggingProcessor = assertIsInstanceOf(LoggingErrorHandler.class,
processor);
             FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, loggingProcessor.getOutput());

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=581937&r1=581936&r2=581937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Thu Oct  4 07:49:03 2007
@@ -433,7 +433,7 @@
      */
     protected Processor getProcessorWithoutErrorHandler(Route route) {
         EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class,
route);
-        Processor processor = consumerRoute.getProcessor();
+        Processor processor = unwrap(consumerRoute.getProcessor());
         return unwrapErrorHandler(processor);
     }
 

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java?rev=581937&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
Thu Oct  4 07:49:03 2007
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.Synchronization;
+
+/**
+ * @version $Revision$
+ */
+public class UnitOfWorkTest extends ContextTestSupport {
+    protected Synchronization synchronization;
+    protected Exchange completed;
+    protected Exchange failed;
+    protected String uri = "direct:foo";
+
+    public void testSuccess() throws Exception {
+        sendMessage();
+
+        assertNull("Should not have failed", failed);
+        assertNotNull("Should have received completed notification", completed);
+
+        log.info("Received completed: " + completed);
+    }
+
+    public void testFail() throws Exception {
+        sendMessage();
+
+        assertNull("Should not have completed", completed);
+        assertNotNull("Should have received failed notification", failed);
+
+        log.info("Received fail: " + failed);
+    }
+
+    public void testException() throws Exception {
+        sendMessage();
+
+        assertNull("Should not have completed", completed);
+        assertNotNull("Should have received failed notification", failed);
+
+        log.info("Received fail: " + failed);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        synchronization = new Synchronization() {
+            public void onComplete(Exchange exchange) {
+                completed = exchange;
+            }
+
+            public void onFailure(Exchange exchange) {
+                failed = exchange;
+            }
+        };
+
+        super.setUp();
+    }
+
+    protected void sendMessage() throws InterruptedException {
+        
+        template.send(uri, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("<hello>world!</hello>");
+                exchange.getUnitOfWork().addSynchronization(synchronization);
+            }
+        });
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:async").thread(1).to("direct:foo");
+                from("direct:foo").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        log.info("Received: " + exchange);
+
+                        String name = getName();
+                        if (name.equals("testFail")) {
+                            log.info("Failing test!");
+                            exchange.getFault(true).setBody("testFail() should always fail
with a fault!");
+                        } else if (name.equals("testException")) {
+                            log.info("Throwing exception!");
+                            throw new Exception("Failing test!");
+                        }
+                    }
+                });
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java?rev=581937&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
Thu Oct  4 07:49:03 2007
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+/**
+ * @version $Revision$
+ */
+public class UnitOfWorkWithAsyncFlowTest extends UnitOfWorkTest {
+    @Override
+    protected void setUp() throws Exception {
+        uri = "direct:async";
+        super.setUp();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkWithAsyncFlowTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message