camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r834165 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/converter/ camel-core/src/main/java/org/apache/camel/model/ camel...
Date Mon, 09 Nov 2009 18:19:40 GMT
Author: davsclaus
Date: Mon Nov  9 18:19:39 2009
New Revision: 834165

URL: http://svn.apache.org/viewvc?rev=834165&view=rev
Log:
CAMEL-2151: Introduced AsyncProcessor for async non blocking request reply processing. Work
in progress.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
  (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=834165&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Mon Nov  9 18:19:39
2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Callback when processing an {@link Exchange} using {@link org.apache.camel.AsyncProcessor}
+ * and the {@link Exchange} have received the data and is ready to be routed.
+ *
+ * @version $Revision$
+ */
+public interface AsyncCallback {
+
+    /**
+     * Callback when the {@link Exchange} is ready to be routed as data has been received.
+     *
+     * @param exchange the exchange
+     */
+    void onDataReceived(Exchange exchange);
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=834165&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Mon Nov  9 18:19:39
2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * An <b>asynchronous</b> processor which can process an {@link Exchange} in
an asynchronous fashion
+ * and signal completion by invoking the {@link AsyncCallback}.
+ * <p/>
+ * For example {@link Producer} can implmenet this interface to support real asynchronous
non blocking
+ * when using the {@link org.apache.camel.processor.SendAsyncProcessor}.
+ *
+ * @version $Revision$
+ */
+public interface AsyncProcessor extends Processor {
+
+    /**
+     * Processes the message exchange
+     *
+     * @param exchange the message exchange
+     * @param callback the callback to invoke when data has been received and the {@link
Exchange}
+     * is ready to be continued routed.
+     * @throws Exception if an internal processing error has occurred.
+     */
+    void process(Exchange exchange, AsyncCallback callback) throws Exception;
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Mon Nov  9 18:19:39 2009
@@ -24,7 +24,9 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,7 @@
     private SedaEndpoint endpoint;
     private Processor processor;
     private ExecutorService executor;
+    private ExceptionHandler exceptionHandler;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
@@ -55,6 +58,17 @@
         return endpoint;
     }
 
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
     public void run() {
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         while (queue != null && isRunAllowed()) {
@@ -70,7 +84,7 @@
                     try {
                         processor.process(exchange);
                     } catch (Exception e) {
-                        LOG.error("Seda queue caught: " + e, e);
+                        getExceptionHandler().handleException(e);
                     }
                 } else {
                     LOG.warn("This consumer is stopped during polling an exchange, so putting
it back on the seda queue: " + exchange);
@@ -78,6 +92,7 @@
                         queue.put(exchange);
                     } catch (InterruptedException e) {
                         LOG.debug("Sleep interrupted, are we stopping? " + (isStopping()
|| isStopped()));
+                        continue;
                     }
                 }
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
Mon Nov  9 18:19:39 2009
@@ -87,8 +87,8 @@
             answer = doConvertTo(type, exchange, value);
         } catch (Exception e) {
             // if its a ExecutionException then we have rethrow it as its not due to failed
conversion
-            boolean execution = ObjectHelper.getException(ExecutionException.class, e) !=
null ||
-                    ObjectHelper.getException(CamelExecutionException.class, e) != null;
+            boolean execution = ObjectHelper.getException(ExecutionException.class, e) !=
null
+                    || ObjectHelper.getException(CamelExecutionException.class, e) != null;
             if (execution) {
                 throw ObjectHelper.wrapCamelExecutionException(exchange, e);
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon
Nov  9 18:19:39 2009
@@ -320,6 +320,23 @@
     /**
      * Sends the exchange to the given endpoint
      *
+     * @param uri  the endpoint to send to
+     * @return the builder
+     */
+    @SuppressWarnings("unchecked")
+    public Type toAsync(String uri) {
+        ToDefinition answer = new ToDefinition(uri);
+        answer.setAsync(true);
+        addOutput(answer);
+        // must push a block so we have a child route for the async reply
+        // routing which is separated from the caller route
+        pushBlock(answer);
+        return (Type) this;
+    }
+
+    /**
+     * Sends the exchange to the given endpoint
+     *
      * @param uri  the String formatted endpoint uri to send to
      * @param args arguments for the string formatting of the uri
      * @return the builder

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Mon Nov
 9 18:19:39 2009
@@ -16,13 +16,21 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.SendAsyncProcessor;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;to/&gt; element
@@ -32,8 +40,18 @@
 @XmlRootElement(name = "to")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class ToDefinition extends SendDefinition<ToDefinition> {
+    @XmlTransient
+    private final List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlAttribute(required = false)
     private ExchangePattern pattern;
+    @XmlAttribute(required = false)
+    private Boolean async;
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
+    @XmlAttribute(required = false)
+    private Integer poolSize;
 
     public ToDefinition() {
     }
@@ -57,8 +75,48 @@
     }
 
     @Override
+    public List<ProcessorDefinition> getOutputs() {
+        return outputs;
+    }
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext) throws Exception {
+        if (async == null || !async) {
+            // when sync then let super create the processor
+            return super.createProcessor(routeContext);
+        }
+
+        if (executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+        }
+        if (executorService == null && poolSize != null) {
+            executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync",
true);
+        }
+
+        // create the child processor which is the async route
+        Processor childProcessor = routeContext.createProcessor(this);
+
+        // create async processor
+        Endpoint endpoint = resolveEndpoint(routeContext);
+
+        SendAsyncProcessor async = new SendAsyncProcessor(endpoint, getPattern(), childProcessor);
+        if (executorService != null) {
+            async.setExecutorService(executorService);
+        }
+        if (poolSize != null) {
+            async.setPoolSize(poolSize);
+        }
+
+        return async;
+    }
+
+    @Override
     public String toString() {
-        return "To[" + getLabel() + "]";
+        if (async != null && async) {
+            return "ToAsync[" + getLabel() + "]";
+        } else {
+            return "To[" + getLabel() + "]";
+        }
     }
 
     @Override
@@ -71,6 +129,14 @@
         return pattern;
     }
 
+    public Boolean isAsync() {
+        return async;
+    }
+
+    public void setAsync(Boolean async) {
+        this.async = async;
+    }
+
     /**
      * Sets the optional {@link ExchangePattern} used to invoke this endpoint
      */

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=834165&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
Mon Nov  9 18:19:39 2009
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+
+/**
+ * @version $Revision$
+ */
+public class SendAsyncProcessor extends SendProcessor implements Runnable {
+
+    private static final int DEFAULT_THREADPOOL_SIZE = 10;
+    protected final Processor target;
+    protected final BlockingQueue<Exchange> completedTasks = new LinkedBlockingQueue<Exchange>();
+    protected ExecutorService executorService;
+    protected int poolSize = DEFAULT_THREADPOOL_SIZE;
+    protected ExceptionHandler exceptionHandler;
+
+    public SendAsyncProcessor(Endpoint destination, Processor target) {
+        super(destination);
+        this.target = target;
+    }
+
+    public SendAsyncProcessor(Endpoint destination, ExchangePattern pattern, Processor target)
{
+        super(destination, pattern);
+        this.target = target;
+    }
+
+    @Override
+    protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
+        // use a new copy of the exchange to route async and handover the on completion to
the new copy
+        // so its the new copy that performs the on completion callback when its done
+        final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+        if (pattern != null) {
+            copy.setPattern(pattern);
+        } else {
+            // default to use in out as we do request reply over async
+            copy.setPattern(ExchangePattern.InOut);
+        }
+        // configure the endpoint we are sending to
+        copy.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri());
+        // send the copy
+        return copy;
+    }
+
+    @Override
+    public Exchange doProcess(Exchange exchange) throws Exception {
+        // now we are done, we should have a API callback for this
+        // send the exchange to the destination using a producer
+        Exchange answer = getProducerCache(exchange).doInProducer(destination, exchange,
pattern, new ProducerCallback<Exchange>() {
+            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern
pattern) throws Exception {
+                exchange = configureExchange(exchange, pattern);
+
+                if (producer instanceof AsyncProcessor) {
+                    // let the producer use this callback to signal completion
+                    AsyncProcessor asyncProcessor = (AsyncProcessor) producer;
+
+                    // pass in the callback that adds the exchange to the completed list
of tasks
+                    final AsyncCallback callback = new AsyncCallback() {
+                        public void onDataReceived(Exchange exchange) {
+                            completedTasks.add(exchange);
+                        }
+                    };
+
+                    asyncProcessor.process(exchange, callback);
+                } else {
+                    // its not a real AsyncProcessor so simulate async processing
+                    producer.process(exchange);
+                    completedTasks.add(exchange);
+                }
+
+                return exchange;
+            }
+        });
+
+        return answer;
+    }
+
+    @Override
+    public String toString() {
+        return "sendAsyncTo(" + destination + (pattern != null ? " " + pattern : "") + "
-> " + target + ")";
+    }
+
+    public ExecutorService getExecutorService() {
+        if (executorService == null) {
+            executorService = createExecutorService();
+        }
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public int getPoolSize() {
+        return poolSize;
+    }
+
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    public void run() {
+        while (isRunAllowed()) {
+            Exchange exchange;
+            try {
+                // TODO: Wonder if we can use take instead of poll with timeout?
+                exchange = completedTasks.poll(1000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
+                continue;
+            }
+
+            if (exchange != null) {
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Async reply received now routing the Exchange: " + exchange);
+                    }
+                    target.process(exchange);
+                } catch (Exception e) {
+                    getExceptionHandler().handleException(e);
+                }
+            }
+        }
+    }
+
+    protected ExecutorService createExecutorService() {
+        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "SendAsyncProcessor",
true);
+    }
+
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        for (int i = 0; i < poolSize; i++) {
+            getExecutorService().execute(this);
+        }
+    }
+
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService = null;
+        }
+        completedTasks.clear();
+
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Mon
Nov  9 18:19:39 2009
@@ -79,8 +79,19 @@
             }
         }
 
+        doProcess(exchange);
+    }
+
+    /**
+     * Strategy to process the exchange
+     *
+     * @param exchange the exchange
+     * @throws Exception can be thrown if error processing exchange
+     * @return the exchange that was processed
+     */
+    public Exchange doProcess(final Exchange exchange) throws Exception {
         // send the exchange to the destination using a producer
-        getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>()
{
+        return getProducerCache(exchange).doInProducer(destination, exchange, pattern, new
ProducerCallback<Exchange>() {
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern
pattern) throws Exception {
                 exchange = configureExchange(exchange, pattern);
                 producer.process(exchange);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java
Mon Nov  9 18:19:39 2009
@@ -33,6 +33,7 @@
 
         for (int i = 0; i < size; i++) {
             template.sendBody(url, "Message " + i);
+            Thread.sleep(3);
         }
 
         assertMockEndpointsSatisfied();

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java?rev=834165&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
Mon Nov  9 18:19:39 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class ToAsyncTest extends ContextTestSupport {
+
+    public void testToAsync() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:result").message(0).outBody(String.class).isEqualTo("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // and it should be different exchange ids
+
+        String ida = getMockEndpoint("mock:a").getReceivedExchanges().get(0).getExchangeId();
+        String idb = getMockEndpoint("mock:b").getReceivedExchanges().get(0).getExchangeId();
+        String idresult = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
+
+        // id a should be different and id b and id result the same
+        assertNotSame(ida, idb);
+        assertNotSame(ida, idresult);
+        assertSame(idb, idresult);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("mock:a").toAsync("direct:bar").to("mock:result");
+
+                from("direct:bar").to("mock:b").transform(constant("Bye World"));
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
(original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java
Mon Nov  9 18:19:39 2009
@@ -18,12 +18,12 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,7 +45,7 @@
     private CountDownLatch bodyComplete = new CountDownLatch(1);
     private volatile boolean failed;
     private volatile Exchange exchange;
-    private volatile Collection<Exchange> completeTasks;
+    private volatile AsyncCallback callback;
 
     public JettyContentExchange() {
         // keep headers by default
@@ -56,8 +56,8 @@
         this.exchange = exchange;
     }
 
-    public void setCompleteTasks(Collection<Exchange> completeTasks) {
-        this.completeTasks = completeTasks;
+    public void setCallback(AsyncCallback callback) {
+        this.callback = callback;
     }
 
     @Override
@@ -82,12 +82,13 @@
             LOG.debug("onResponseComplete for " + getUrl());
         }
 
-        if (completeTasks != null && exchange != null) {
+        if (callback != null && exchange != null) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Adding Exchange to completed task: " + exchange);
             }
-            // we are complete so add the exchange to completed tasks
-            completeTasks.add(exchange);
+
+            // signal we are complete
+            callback.onDataReceived(exchange);
         }
     }
 

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
(original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
Mon Nov  9 18:19:39 2009
@@ -36,8 +36,6 @@
     private boolean sessionSupport;
     private List<Handler> handlers;
     private HttpClient client;
-    private boolean synchronous = true;
-    private int concurrentConsumers = 1;
 
     public JettyHttpEndpoint(JettyHttpComponent component, String uri, URI httpURL) throws
URISyntaxException {
         super(uri, component, httpURL);
@@ -85,20 +83,4 @@
         this.client = client;
     }
 
-    public boolean isSynchronous() {
-        return synchronous;
-    }
-
-    public void setSynchronous(boolean synchronous) {
-        this.synchronous = synchronous;
-    }
-
-    public int getConcurrentConsumers() {
-        return concurrentConsumers;
-    }
-
-    public void setConcurrentConsumers(int concurrentConsumers) {
-        this.concurrentConsumers = concurrentConsumers;
-    }
-    
 }

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
(original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
Mon Nov  9 18:19:39 2009
@@ -19,21 +19,16 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.http.HttpMethods;
 import org.apache.camel.component.http.helper.HttpProducerHelper;
 import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
-import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mortbay.jetty.client.HttpClient;
@@ -41,10 +36,8 @@
 /**
  * @version $Revision$
  */
-public class JettyHttpProducer extends DefaultProducer implements Runnable {
+public class JettyHttpProducer extends DefaultProducer implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(JettyHttpProducer.class);
-    private final BlockingQueue<Exchange> completeTasks = new LinkedBlockingQueue<Exchange>();
-    private ExecutorService executor;
     private final HttpClient client;
 
     // TODO: support that bridge option
@@ -64,37 +57,30 @@
         HttpClient client = getEndpoint().getClient();
 
         JettyContentExchange httpExchange = createHttpExchange(exchange);
-
-        if (getEndpoint().isSynchronous()) {
-            sendSynchronous(exchange, client, httpExchange);
-        } else {
-            sendAsynchronous(exchange, client, httpExchange);
-        }
+        sendSynchronous(exchange, client, httpExchange);
     }
+    public void process(Exchange exchange, AsyncCallback callback) throws Exception {
+        HttpClient client = getEndpoint().getClient();
 
-    protected void sendAsynchronous(final Exchange exchange, final HttpClient client, final
JettyContentExchange httpExchange) throws IOException {
-        // use a new copy of the exchange to route async and handover the on completion to
the new copy
-        // so its the new copy that performs the on completion callback when its done
-        final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
-        // the copy must be an in ouy MEP
-        copy.setPattern(ExchangePattern.InOut);
-
-        // configure http exchange to signal when its complete
-        httpExchange.setCompleteTasks(completeTasks);
-        httpExchange.setExchange(copy);
+        JettyContentExchange httpExchange = createHttpExchange(exchange);
+        sendAsynchronous(exchange, client, httpExchange, callback);
+    }
 
+    protected void sendSynchronous(Exchange exchange, HttpClient client, JettyContentExchange
httpExchange) throws IOException {
         // set the body with the message holder
-        copy.setOut(new JettyHttpMessage(exchange, httpExchange, getEndpoint().isThrowExceptionOnFailure()));
+        exchange.setOut(new JettyHttpMessage(exchange, httpExchange, getEndpoint().isThrowExceptionOnFailure()));
 
         doSendExchange(client, httpExchange);
+    }
+
+    protected void sendAsynchronous(final Exchange exchange, final HttpClient client, final
JettyContentExchange httpExchange,
+                                    final AsyncCallback callback) throws IOException {
 
-        // now we need to let the original exchange to stop
-        // and let that copy exchange continue
         // TODO: Use something that marks it as async routed
-        exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-    }
+        exchange.setProperty("CamelSendAsync", Boolean.TRUE);
+        httpExchange.setCallback(callback);
+        httpExchange.setExchange(exchange);
 
-    protected void sendSynchronous(Exchange exchange, HttpClient client, JettyContentExchange
httpExchange) throws IOException {
         // set the body with the message holder
         exchange.setOut(new JettyHttpMessage(exchange, httpExchange, getEndpoint().isThrowExceptionOnFailure()));
 
@@ -141,56 +127,16 @@
         }
     }
 
-    public void run() {
-        while (isRunAllowed()) {
-            Exchange exchange;
-            try {
-                // TODO: Wonder if we can use take instead of poll with timeout?
-                exchange = completeTasks.poll(1000, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
-                continue;
-            }
-
-            if (exchange != null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Continue to route Exchange: " + exchange);
-                }
-
-                // TODO: hook into exiting route path
-                exchange.getContext().createProducerTemplate().send("mock:result", exchange);
-            }
-        }
-    }
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
         client.start();
-
-        // this is only needed if we are asynchronous where we need to have a thread pool
of listeners
-        // that will process the completed tasks
-        if (!getEndpoint().isSynchronous()) {
-            int poolSize = getEndpoint().getConcurrentConsumers();
-            executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, getEndpoint().getEndpointUri(),
true);
-            for (int i = 0; i < poolSize; i++) {
-                executor.execute(this);
-            }
-        }
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-
         client.stop();
-
-        if (executor != null) {
-            executor.shutdownNow();
-            executor = null;
-        }
-        completeTasks.clear();
     }
 
 }

Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
(original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java
Mon Nov  9 18:19:39 2009
@@ -33,7 +33,7 @@
     private static String thread1;
     private static String thread2;
 
-    private String url = "jetty://http://0.0.0.0:9123/foo?synchronous=false&concurrentConsumers=5";
+    private String url = "jetty://http://0.0.0.0:9123/foo";
 
     @Test
     public void testAsynchronous() throws Exception {
@@ -42,7 +42,7 @@
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.message(0).outBody().isEqualTo("Bye World");
+        mock.message(0).body().isEqualTo("Bye World");
 
         Object body = null;
         template.sendBody("direct:start", body);
@@ -61,7 +61,7 @@
                     public void process(Exchange exchange) throws Exception {
                         thread1 = Thread.currentThread().getName();
                     }
-                }).to(url).process(new Processor() {
+                }).toAsync(url).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         thread2 = Thread.currentThread().getName();
                     }

Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
(original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java
Mon Nov  9 18:19:39 2009
@@ -33,7 +33,6 @@
         mock.message(0).outBody(String.class).contains("google");
 
         template.sendBody("direct:start", null);
-        System.out.println("I am not blocked");
 
         assertMockEndpointsSatisfied();
     }
@@ -45,7 +44,7 @@
             public void configure() throws Exception {
                 from("direct:start")
                     // to prevent redirect being thrown as an exception
-                    .to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=false&concurrentConsumers=5")
+                    .toAsync("jetty://http://www.google.com?throwExceptionOnFailure=false")
                     .to("mock:result");
             }
         };

Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java?rev=834165&r1=834164&r2=834165&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
(original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java
Mon Nov  9 18:19:39 2009
@@ -47,7 +47,7 @@
             @Override
             public void configure() throws Exception {
                 // to prevent redirect being thrown as an exception
-                from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=true");
+                from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false");
             }
         };
     }



Mime
View raw message