camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r570471 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/component/file/ main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/component/vm/ main/java/org/apache/camel/impl/ m...
Date Tue, 28 Aug 2007 16:20:09 GMT
Author: chirino
Date: Tue Aug 28 09:20:08 2007
New Revision: 570471

URL: http://svn.apache.org/viewvc?rev=570471&view=rev
Log:
- Adding intial support for Aync processing of Exchanges:
see http://www.nabble.com/Asynchronous-Exchange-Processing-tf4313758s22882.html for discussions
on the topic.


Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java   (with
props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java   (with
props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
  (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=570471&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Tue
Aug 28 09:20:08 2007
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+
+/**
+ * The callback interface for an {@see AsyncProcessor} so that it can 
+ * notify you when an {@see Exchange} has completed. 
+ */
+public interface AsyncCallback {
+    
+    /**
+     * This method is invoked once the Exchange is completed.  If an error 
+     * occurred while processing the exchange, the exception field of the 
+     * {@see Exchange} being processed will hold the error. 
+     *  
+     * @param doneSynchronously set to true if the processing of the exchange was completed
synchronously thread.
+     */
+    void done(boolean doneSynchronously);    
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=570471&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Tue
Aug 28 09:20:08 2007
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+
+
+/**
+ * A more complex version of {@see Processor} which supports asynchronous
+ * processing of the {@see Exchange}.  Any processor can be coerced to
+ * have an {@see AsyncProcessor} interface by using the {@see AsyncProcessorTypeConverter.convert()}
+ * method.
+ * 
+ * @version $Revision$
+ */
+public interface AsyncProcessor extends Processor {
+
+    /**
+     * Processes the message exchange.  Similar to {@see Processor.process}, but
+     * the caller supports having the exchange asynchronously processed.
+     * 
+     * @param  The @{see AsyncCallback} will be invoked when the processing 
+     *         of the exchange is completed. If the exchange is completed synchronously,
then the 
+     *         callback is also invoked synchronously.  The callback should therefore be
careful of
+     *         starting recursive loop.
+     *         
+     * @return true if the processing was completed synchronously.
+     */
+    boolean process(Exchange exchange, AsyncCallback callback);
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Aug 28
09:20:08 2007
@@ -133,6 +133,11 @@
     void setException(Throwable e);
 
     /**
+     * Throws the exception associated with this exchange.
+     */
+    void throwException() throws Exception;
+
+    /**
      * Returns the container so that a processor can resolve endpoints from URIs
      * 
      * @return the container which owns this exchange
@@ -152,4 +157,5 @@
      * copied
      */
     void copyFrom(Exchange source);
+
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Tue Aug 28 09:20:08 2007
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.file;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.strategy.FileProcessStrategy;
 import org.apache.camel.impl.ScheduledPollConsumer;
@@ -23,6 +25,9 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * @version $Revision: 523016 $
@@ -62,8 +67,12 @@
             LOG.debug("Skipping directory " + fileOrDirectory);
         }
     }
+    
+    ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File,
File>();
 
     protected void pollFile(final File file) {
+        
+
         if (!file.exists()) {
             return;
         }
@@ -77,10 +86,15 @@
                     }
                     return;
                 }
+            } else {
+                if (filesBeingProcessed.contains(file)) {
+                    return;
+                }
+                filesBeingProcessed.put(file, file);
             }
 
-            FileProcessStrategy processStrategy = endpoint.getFileStrategy();
-            FileExchange exchange = endpoint.createExchange(file);
+            final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
+            final FileExchange exchange = endpoint.createExchange(file);
 
             if (isPreserveFileName()) {
                 String relativePath = file.getPath().substring(endpoint.getFile().getPath().length());
@@ -95,8 +109,25 @@
                     LOG.debug("About to process file:  " + file + " using exchange: " + exchange);
                 }
                 if (processStrategy.begin(endpoint, exchange, file)) {
-                    getProcessor().process(exchange);
-                    processStrategy.commit(endpoint, exchange, file);
+                    
+                    // Use the async processor interface so that processing of
+                    // the
+                    // exchange can happen asynchronously
+                    getAsyncProcessor().process(exchange, new AsyncCallback() {
+                        public void done(boolean sync) {
+                            if (exchange.getException() == null) {
+                                try {
+                                    processStrategy.commit(endpoint, (FileExchange)exchange,
file);
+                                } catch (Exception e) {
+                                    handleException(e);
+                                }
+                            } else {
+                                handleException(exchange.getException());
+                            }
+                            filesBeingProcessed.remove(file);
+                        }
+                    });
+                    
                 }
                 else {
                     if (LOG.isDebugEnabled()) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
Tue Aug 28 09:20:08 2007
@@ -31,8 +31,8 @@
  * @version $Revision: 1.1 $
  */
 public class SedaComponent<E extends Exchange> extends DefaultComponent<E> {
-    public BlockingQueue<E> createQueue() {
-        return new LinkedBlockingQueue<E>(1000);
+    public BlockingQueue<SedaEndpoint.Entry<E>> createQueue() {
+        return new LinkedBlockingQueue<SedaEndpoint.Entry<E>>(1000);
     }
 
     @Override

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Tue Aug 28 09:20:08 2007
@@ -19,10 +19,13 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -33,12 +36,12 @@
     private static final Log LOG = LogFactory.getLog(SedaConsumer.class);
 
     private SedaEndpoint<E> endpoint;
-    private Processor processor;
+    private AsyncProcessor processor;
     private Thread thread;
 
     public SedaConsumer(SedaEndpoint<E> endpoint, Processor processor) {
         this.endpoint = endpoint;
-        this.processor = processor;
+        this.processor = AsyncProcessorTypeConverter.convert(processor);
     }
 
     @Override
@@ -48,21 +51,30 @@
 
     public void run() {
         while (!isStopping()) {
-            E exchange;
+            final SedaEndpoint.Entry<E> entry;
             try {
-                exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+                entry = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 break;
             }
-            if (exchange != null && !isStopping()) {
-                try {
-                    processor.process(exchange);
-                } catch (AlreadyStoppedException e) {
-                    LOG.debug("Ignoring failed message due to shutdown: " + e, e);
-                    break;
-                } catch (Throwable e) {
-                    LOG.error(e);
-                }
+            if (entry != null && !isStopping()) {
+                processor.process(entry.getExchange(), new AsyncCallback() {
+                    public void done(boolean sync) {
+                        if (entry.getCallback() != null) {
+                            entry.getCallback().done(false);
+                        } else {
+                            Throwable e = entry.getExchange().getException();
+                            if (e != null) {
+                                if (e instanceof AlreadyStoppedException) {
+                                    LOG.debug("Ignoring failed message due to shutdown: "
+ e, e);
+                                } else {
+                                    LOG.error(e);
+                                }
+                            }
+                        }
+                    }
+                });
+
             }
         }
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Tue Aug 28 09:20:08 2007
@@ -18,8 +18,11 @@
 
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -35,9 +38,47 @@
  * @version $Revision: 519973 $
  */
 public class SedaEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
-    private BlockingQueue<E> queue;
+    
+    static public class Entry<E extends Exchange> {
+        E exchange;
+        AsyncCallback callback;
+        
+        public Entry(E exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+        }
+        
+        public E getExchange() {
+            return exchange;
+        }
+        public void setExchange(E exchange) {
+            this.exchange = exchange;
+        }
+        public AsyncCallback getCallback() {
+            return callback;
+        }
+        public void setCallback(AsyncCallback callback) {
+            this.callback = callback;
+        }
+        
+    }
+    
+    private final class SedaProducer extends DefaultProducer implements AsyncProcessor {
+        private SedaProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+        public void process(Exchange exchange) {
+            queue.add(new Entry<E>(toExchangeType(exchange), null));
+        }
+        public boolean process(Exchange exchange, AsyncCallback callback) {
+            queue.add(new Entry<E>(toExchangeType(exchange), callback));
+            return false;
+        }
+    }
+
+    private BlockingQueue<Entry<E>> queue;
 
-    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<E> queue)
{
+    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Entry<E>>
queue) {
         super(endpointUri, component);
         this.queue = queue;
     }
@@ -47,11 +88,7 @@
     }
 
     public Producer<E> createProducer() throws Exception {
-        return new DefaultProducer(this) {
-            public void process(Exchange exchange) {
-                queue.add(toExchangeType(exchange));
-            }
-        };
+        return new SedaProducer(this);
     }
 
     public Consumer<E> createConsumer(Processor processor) throws Exception {
@@ -64,7 +101,7 @@
         return (E)new DefaultExchange(getContext());
     }
 
-    public BlockingQueue<E> getQueue() {
+    public BlockingQueue<Entry<E>> getQueue() {
         return queue;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
Tue Aug 28 09:20:08 2007
@@ -24,6 +24,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.component.seda.SedaComponent;
 import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.component.seda.SedaEndpoint.Entry;
 
 /**
  * An implementation of the <a href="http://activemq.apache.org/camel/vm.html">VM components</a>
@@ -34,19 +35,19 @@
  * @version $Revision: 1.1 $
  */
 public class VmComponent<E extends Exchange> extends SedaComponent<E> {
-    protected static Map<String, BlockingQueue<Exchange>> queues = new HashMap<String,
BlockingQueue<Exchange>>();
+    protected static Map<String, BlockingQueue> queues = new HashMap<String, BlockingQueue>();
 
     @Override
     protected Endpoint<E> createEndpoint(String uri, String remaining, Map parameters)
throws Exception {
-        BlockingQueue<E> blockingQueue = (BlockingQueue<E>) getBlockingQueue(uri);
+        BlockingQueue<SedaEndpoint.Entry<E>> blockingQueue = (BlockingQueue<SedaEndpoint.Entry<E>>)
getBlockingQueue(uri);
         return new SedaEndpoint<E>(uri, this, blockingQueue);
     }
 
-    protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
+    protected BlockingQueue<Entry<E>> getBlockingQueue(String uri) {
         synchronized (queues) {
-            BlockingQueue<Exchange> answer = queues.get(uri);
+            BlockingQueue<Entry<E>> answer = queues.get(uri);
             if (answer == null) {
-                answer = (BlockingQueue<Exchange>) createQueue();
+                answer = createQueue();
                 queues.put(uri, answer);
             }
             return answer;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
Tue Aug 28 09:20:08 2007
@@ -16,10 +16,15 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.Future;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
 
@@ -29,6 +34,7 @@
 public class DefaultConsumer<E extends Exchange> extends ServiceSupport implements
Consumer<E> {
     private Endpoint<E> endpoint;
     private Processor processor;
+    private AsyncProcessor asyncProcessor;
     private ExceptionHandler exceptionHandler;
 
     public DefaultConsumer(Endpoint<E> endpoint, Processor processor) {
@@ -47,6 +53,20 @@
 
     public Processor getProcessor() {
         return processor;
+    }
+
+    /**
+     * Provides an {@see AsyncProcessor} interface to the configured
+     * processor on the consumer.  If the processor does not implement
+     * the interface, it will be adapted so that it does.  
+     * 
+     * @return
+     */
+    public AsyncProcessor getAsyncProcessor() {
+        if (asyncProcessor == null) {
+            asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+        }
+        return asyncProcessor;
     }
 
     public ExceptionHandler getExceptionHandler() {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Tue Aug 28 09:20:08 2007
@@ -22,6 +22,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.UuidGenerator;
 
 /**
@@ -182,6 +183,19 @@
         this.exception = exception;
     }
 
+    public void throwException() throws Exception {
+        if (exception == null) {
+            return;
+        }
+        if (exception instanceof Exception) {
+            throw (Exception)exception;
+        }
+        if (exception instanceof RuntimeException) {
+            throw (RuntimeException)exception;
+        }
+        throw new RuntimeCamelException(exception);
+    }
+
     public Message getFault() {
         return fault;
     }
@@ -222,4 +236,5 @@
             messageSupport.setExchange(this);
         }
     }
+
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=570471&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
Tue Aug 28 09:20:08 2007
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.converter;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
+import org.apache.camel.TypeConverter;
+
+/**
+ * A simple converter that can convert any Processor to an AsyncProcessor.
+ * Processing will still occur synchronously but it will provide the required
+ * notifications that the caller expects.
+ * 
+ * @version $Revision$
+ */
+public class AsyncProcessorTypeConverter implements TypeConverter {
+
+    public static final class ProcessorToAsynProcessorBridge implements AsyncProcessor, Service
{
+        private final Processor processor;
+
+        private ProcessorToAsynProcessorBridge(Processor processor) {
+            this.processor = processor;
+        }
+
+        public boolean process(Exchange exchange, AsyncCallback callback) {
+            try {
+                processor.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            processor.process(exchange);
+        }
+
+        public void start() throws Exception {
+            if (processor instanceof Service) {
+                ((Service)processor).start();
+            }
+        }
+
+        public void stop() throws Exception {
+            if (processor instanceof Service) {
+                ((Service)processor).stop();
+            }
+        }
+    }
+
+    public <T> T convertTo(Class<T> toType, Object value) {
+        if (value != null) {
+            if (toType.equals(AsyncProcessor.class)) {
+                if (value instanceof AsyncProcessor) {
+                    return toType.cast(value);
+                } else if (value instanceof Processor) {
+                    // Provide an async bridge to the regular processor.
+                    final Processor processor = (Processor)value;
+                    return toType.cast(new ProcessorToAsynProcessorBridge(processor));
+                }
+            }
+        }
+        return null;
+    }
+
+    public static AsyncProcessor convert(Processor value) {
+        if (value instanceof AsyncProcessor) {
+            return (AsyncProcessor)value;
+        }
+        return new ProcessorToAsynProcessorBridge(value);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
Tue Aug 28 09:20:08 2007
@@ -47,6 +47,7 @@
     public DefaultTypeConverter(Injector injector) {
         typeConverterLoaders.add(new AnnotationTypeConverterLoader());
         this.injector = injector;
+        addFallbackConverter(new AsyncProcessorTypeConverter());
         addFallbackConverter(new PropertyEditorTypeConverter());
         addFallbackConverter(new ToStringTypeConverter());
         addFallbackConverter(new ArrayTypeConverter());

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Tue Aug 28 09:20:08 2007
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.ExceptionType;
-import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,13 +35,23 @@
  * 
  * @version $Revision$
  */
-public class DeadLetterChannel extends ErrorHandlerSupport {
+public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
     public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
     public static final String REDELIVERED = "org.apache.camel.Redelivered";
 
+    private class RedeliveryData {
+        int redeliveryCounter;
+        long redeliveryDelay;
+
+        // default behaviour which can be overloaded on a per exception basis
+        RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
+        Processor failureProcessor = deadLetter;
+    }
+
     private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
     private Processor output;
     private Processor deadLetter;
+    private AsyncProcessor outputAsync;
     private RedeliveryPolicy redeliveryPolicy;
     private Logger logger;
 
@@ -47,14 +59,15 @@
         this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
     }
 
-    public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy,
-                             Logger logger) {
+    public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy,
Logger logger) {
         this.deadLetter = deadLetter;
-        this.output = output;
+        this.output = output;        
+        this.outputAsync = AsyncProcessorTypeConverter.convert(output);
+        
         this.redeliveryPolicy = redeliveryPolicy;
         this.logger = logger;
     }
-    
+
     public static <E extends Exchange> Logger createDefaultLogger() {
         return new Logger(LOG, LoggingLevel.ERROR);
     }
@@ -64,6 +77,67 @@
         return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy
+ "]";
     }
 
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        return process(exchange, callback, new RedeliveryData());
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData
data) {
+
+        while (true) {
+            if (exchange.getException() != null) {
+                Throwable e = exchange.getException();
+                exchange.setException(null); // Reset it since we are handling it.
+                
+                logger.log("On delivery attempt: " + data.redeliveryCounter + " caught: "
+ e, e);
+                data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+
+                ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
+                if (exceptionPolicy != null) {
+                    data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
+                    Processor processor = exceptionPolicy.getErrorHandler();
+                    if (processor != null) {
+                        data.failureProcessor = processor;
+                    }
+                }
+            }
+
+            if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+                AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
+                return afp.process(exchange, callback);
+            }
+
+            if (data.redeliveryCounter > 0) {
+                // Figure out how long we should wait to resend this message.
+                data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+                sleep(data.redeliveryDelay);
+            }
+
+            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
+                public void done(boolean sync) {
+                    // Only handle the async case...
+                    if (sync) {
+                        return;
+                    }
+                    if (exchange.getException() != null) {
+                        process(exchange, callback, data);
+                    } else {
+                        callback.done(sync);
+                    }
+                }
+            });
+            if (!sync) {
+                // It is going to be processed async..
+                return false;
+            }
+            if (exchange.getException() == null) {
+                // If everything went well.. then we exit here..
+                return true;
+            }
+            // error occured so loop back around.....
+        }
+
+    }
+    
     public void process(Exchange exchange) throws Exception {
         int redeliveryCounter = 0;
         long redeliveryDelay = 0;
@@ -86,7 +160,6 @@
                 logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e,
e);
                 redeliveryCounter = incrementRedeliveryCounter(exchange, e);
 
-
                 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
                 if (exceptionPolicy != null) {
                     currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(currentRedeliveryPolicy);
@@ -185,4 +258,5 @@
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(deadLetter, output);
     }
+
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
Tue Aug 28 09:20:08 2007
@@ -17,13 +17,18 @@
 package org.apache.camel.processor;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -33,7 +38,7 @@
  * 
  * @version $Revision$
  */
-public class Pipeline extends MulticastProcessor implements Processor {
+public class Pipeline extends MulticastProcessor implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 
     public Pipeline(Collection<Processor> processors) {
@@ -48,7 +53,7 @@
         }
         return new Pipeline(processors);
     }
-
+    
     public void process(Exchange exchange) throws Exception {
         Exchange nextExchange = exchange;
         boolean first = true;
@@ -63,6 +68,77 @@
     }
 
     /**
+     * It would be nice if we could implement the sync process method as follows.. but we
+     * can't since the dead letter handler seem to like to handle the error but still 
+     * set the Exchange.exception field.  When that happens this method throws that
+     * exception but it seem that folks don't expect to get that exception.
+     * 
+     * @param exchange
+     * @throws Exception
+     */
+    public void xprocess(Exchange exchange) throws Exception {
+        // This could become a base class method for an AsyncProcessor
+        final CountDownLatch latch = new CountDownLatch(1);
+        if (!process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                if (sync) {
+                    return;
+                }
+                latch.countDown();
+            }
+        })) {
+            latch.await();
+        }
+        // If there was an exception associated with the exchange, throw it.
+        exchange.throwException();
+    }
+    
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Iterator<Processor> processors = getProcessors().iterator();
+        Exchange nextExchange = exchange;
+        while (processors.hasNext()) {
+            AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+            boolean sync = process(nextExchange, callback, processors, processor);
+            // Continue processing the pipeline synchronously ...
+            if (sync) {
+                nextExchange = createNextExchange(processor, exchange);
+            } else {
+                // The pipeline will be completed async...
+                return true;
+            }
+        }
+        // If we get here then the pipeline was processed entirely
+        // synchronously.
+        callback.done(true);
+        return true;
+    }
+
+    private boolean process(final Exchange exchange, final AsyncCallback callback, final
Iterator<Processor> processors, AsyncProcessor processor) {
+        return processor.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                
+                // We only have to handle async completion of
+                // the pipeline..  
+                if( sync ) {
+                    return;
+                }
+                
+                // Continue processing the pipeline... 
+                Exchange nextExchange = exchange;
+                while( processors.hasNext() ) {
+                    AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+                    nextExchange = createNextExchange(processor, exchange);
+                    sync = process( nextExchange, callback, processors, processor);
+                    if( !sync ) {
+                        return;
+                    }
+                }
+                callback.done(true);
+            }
+        });
+    }
+
+    /**
      * Strategy method to create the next exchange from the
      * 
      * @param producer the producer used to send to the endpoint
@@ -111,4 +187,5 @@
     public String toString() {
         return "Pipeline" + getProcessors();
     }
+
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=570471&r1=570470&r2=570471&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
Tue Aug 28 09:20:08 2007
@@ -16,22 +16,25 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Service;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
  */
-public class SendProcessor extends ServiceSupport implements Processor, Service {
+public class SendProcessor extends ServiceSupport implements AsyncProcessor, Service {
     private static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
     private Endpoint destination;
     private Producer producer;
+    private AsyncProcessor processor;
 
     public SendProcessor(Endpoint destination) {
         if (destination == null) {
@@ -57,6 +60,21 @@
         }
     }
 
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (producer == null) {
+            if (isStopped()) {
+                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
+            } else {
+                exchange.setException(new IllegalStateException("No producer, this processor
has not been started!"));
+            }
+            callback.done(true);
+            return true;
+        } else {
+            return processor.process(exchange, callback);
+        }
+    }
+
+    
     public Endpoint getDestination() {
         return destination;
     }
@@ -64,6 +82,7 @@
     protected void doStart() throws Exception {
         this.producer = destination.createProducer();
         this.producer.start();
+        this.processor = AsyncProcessorTypeConverter.convert(producer);
     }
 
     protected void doStop() throws Exception {
@@ -72,6 +91,7 @@
                 producer.stop();
             } finally {
                 producer = null;
+                processor = null;
             }
         }
     }

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java?rev=570471&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
Tue Aug 28 09:20:08 2007
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FileAsyncRouteTest extends ContextTestSupport {
+    protected Object expectedBody = "Hello there!";
+    protected String uri = "file:target/test-default-inbox?delete=true";
+
+    CountDownLatch receivedLatch = new CountDownLatch(1);
+    CountDownLatch processingLatch = new CountDownLatch(1);
+    AtomicReference<File> file = new AtomicReference<File>();
+
+    @Override
+    protected void tearDown() throws Exception {
+        processingLatch.countDown();
+        receivedLatch.countDown();
+        super.tearDown();
+    }
+    
+    public void testFileRoute() throws Exception {
+        MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        result.expectedBodiesReceived(expectedBody);
+        result.setDefaulResultWaitMillis(5000);
+
+        template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
+
+        // Wait till the exchange is delivered to the processor
+        assertTrue("Async processor received exchange", receivedLatch.await(5, TimeUnit.SECONDS));
+        File file = this.file.get();
+        
+        // The file consumer support async processing of the exchange,
+        // so the file should not get deleted until the exchange
+        // finishes being asynchronously processed.
+        Thread.sleep(1000);
+        assertTrue("File should exist", file.exists());
+
+        // Release the async processing thread so that the exchange completes
+        // and the file
+        // gets deleted.
+        processingLatch.countDown();
+        Thread.sleep(1000);
+        assertFalse("File should not exist", file.exists());
+
+        result.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).to("seda:a");
+                from("seda:a").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        file.set((File)exchange.getIn().getBody());
+                        // Simulate a processing delay..
+                        receivedLatch.countDown();
+                        processingLatch.await();
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message