camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r919331 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/
Date Fri, 05 Mar 2010 08:40:52 GMT
Author: davsclaus
Date: Fri Mar  5 08:40:51 2010
New Revision: 919331

URL: http://svn.apache.org/viewvc?rev=919331&view=rev
Log:
CAMEL-1588: More easy thread pool configuration in DSL. Work in progress.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalCustomPoolTest.java
      - copied, changed from r919310, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
      - copied, changed from r919310, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java
    camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Fri
Mar  5 08:40:51 2010
@@ -46,7 +46,7 @@
  */
 @XmlRootElement(name = "aggregate")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> {
+public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements
ExecutorServiceAware<AggregateDefinition> {
     @XmlElement(name = "correlationExpression", required = true)
     private ExpressionSubElementDefinition correlationExpression;
     @XmlElement(name = "completionPredicate", required = false)
@@ -567,23 +567,13 @@
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the sending the aggregated output.
-     *
-     * @param executorService the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public AggregateDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the sending the aggregated output.
-     *
-     * @param executorServiceRef reference to the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public AggregateDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java?rev=919331&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
Fri Mar  5 08:40:51 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Enables definitions to support concurrency using {@link java.util.concurrent.ExecutorService}
+ *
+ * @version $Revision$
+ */
+public interface ExecutorServiceAware<Type extends ProcessorDefinition> {
+
+    /**
+     * Setting the executor service for executing
+     *
+     * @param executorService the executor service
+     * @return the builder
+     */
+    <Type> Type executorService(ExecutorService executorService);
+
+    /**
+     * Setting the executor service for executing
+     *
+     * @param executorServiceRef reference for a {@link java.util.concurrent.ExecutorService}
+     *                           to lookup in the {@link org.apache.camel.spi.Registry}
+     * @return the builder
+     */
+    <Type> Type executorServiceRef(String executorServiceRef);
+
+    ExecutorService getExecutorService();
+
+    void setExecutorService(ExecutorService executorService);
+
+    String getExecutorServiceRef();
+
+    void setExecutorServiceRef(String executorServiceRef);
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAware.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Fri
Mar  5 08:40:51 2010
@@ -37,7 +37,7 @@
  */
 @XmlRootElement(name = "multicast")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> {
+public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> implements
ExecutorServiceAware<MulticastDefinition> {
     @XmlAttribute(required = false)
     private Boolean parallelProcessing;
     @XmlAttribute(required = false)
@@ -126,22 +126,13 @@
         return this;
     }
        
-    /**
-     * Setting the executor service for executing the multicasting action.
-     *
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public MulticastDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the sending to the recipients.
-     *
-     * @param executorServiceRef reference to the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public MulticastDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
Fri Mar  5 08:40:51 2010
@@ -19,12 +19,14 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -40,7 +42,7 @@
  */
 @XmlRootElement(name = "onCompletion")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class OnCompletionDefinition extends ProcessorDefinition<ProcessorDefinition>
{
+public class OnCompletionDefinition extends ProcessorDefinition<ProcessorDefinition>
implements ExecutorServiceAware<OnCompletionDefinition> {
 
     @XmlAttribute(required = false)
     private Boolean onCompleteOnly = Boolean.FALSE;
@@ -50,6 +52,10 @@
     private WhenDefinition onWhen;
     @XmlElementRef
     private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
 
     public OnCompletionDefinition() {
     }
@@ -90,7 +96,16 @@
             throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot
be true. Only one of them can be true. On node: " + this);
         }
 
-        return new OnCompletionProcessor(childProcessor, onCompleteOnly, onFailureOnly, when);
+        if (executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef
+ " not found in registry.");
+            }
+        }
+
+        OnCompletionProcessor answer = new OnCompletionProcessor(childProcessor, onCompleteOnly,
onFailureOnly, when);
+        answer.setExecutorService(executorService);
+        return answer;
     }
 
     /**
@@ -112,6 +127,7 @@
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public ProcessorDefinition<? extends ProcessorDefinition<?>> end() {
         // pop parent block, as we added our self as block to parent when synchronized was
defined in the route
         getParent().popBlock();
@@ -172,6 +188,17 @@
         return clause;
     }
 
+    @SuppressWarnings("unchecked")
+    public OnCompletionDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public OnCompletionDefinition executorServiceRef(String executorServiceRef) {
+        setExecutorServiceRef(executorServiceRef);
+        return this;
+    }
 
     public List<ProcessorDefinition> getOutputs() {
         return outputs;
@@ -205,4 +232,19 @@
         this.onWhen = onWhen;
     }
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public String getExecutorServiceRef() {
+        return executorServiceRef;
+    }
+
+    public void setExecutorServiceRef(String executorServiceRef) {
+        this.executorServiceRef = executorServiceRef;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Fri
Mar  5 08:40:51 2010
@@ -24,6 +24,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.xml.bind.annotation.XmlAccessType;
@@ -1592,6 +1593,7 @@
      * destination gets a copy of the original message to avoid the processors
      * interfering with each other using {@link ExchangePattern#InOnly}.
      *
+     * @param uri  the destination
      * @return the builder
      */
     @SuppressWarnings("unchecked")
@@ -1604,6 +1606,46 @@
 
     /**
      * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
+     * Sends messages to all its child outputs; so that each processor and
+     * destination gets a copy of the original message to avoid the processors
+     * interfering with each other using {@link ExchangePattern#InOnly}.
+     *
+     * @param uri  the destination
+     * @param      executorService a custom {@link ExecutorService} to use as thread pool
+     *             for sending tapped exchanges
+     * @return the builder
+     */
+    @SuppressWarnings("unchecked")
+    public Type wireTap(String uri, ExecutorService executorService) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setUri(uri);
+        answer.setExecutorService(executorService);
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
+     * Sends messages to all its child outputs; so that each processor and
+     * destination gets a copy of the original message to avoid the processors
+     * interfering with each other using {@link ExchangePattern#InOnly}.
+     *
+     * @param uri  the destination
+     * @param      executorServiceRef reference to lookup a custom {@link ExecutorService}
+     *             to use as thread pool for sending tapped exchanges
+     * @return the builder
+     */
+    @SuppressWarnings("unchecked")
+    public Type wireTap(String uri, String executorServiceRef) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setUri(uri);
+        answer.setExecutorServiceRef(executorServiceRef);
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
      * Sends a new {@link org.apache.camel.Exchange} to the destination
      * using {@link ExchangePattern#InOnly}.
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
Fri Mar  5 08:40:51 2010
@@ -41,7 +41,7 @@
  */
 @XmlRootElement(name = "recipientList")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class RecipientListDefinition extends ExpressionNode {
+public class RecipientListDefinition extends ExpressionNode implements ExecutorServiceAware<RecipientListDefinition>
{
 
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
@@ -187,23 +187,13 @@
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the sending to the recipients.
-     *
-     * @param executorService the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public RecipientListDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the sending to the recipients.
-     *
-     * @param executorServiceRef reference to the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public RecipientListDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar
 5 08:40:51 2010
@@ -40,7 +40,7 @@
  */
 @XmlRootElement(name = "split")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class SplitDefinition extends ExpressionNode {
+public class SplitDefinition extends ExpressionNode implements ExecutorServiceAware<SplitDefinition>
{
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
     @XmlTransient
@@ -180,23 +180,13 @@
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the splitting action.
-     *
-     * @param executorService the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public SplitDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
     
-    /**
-     * Setting the executor service for executing the splitting action.
-     *
-     * @param executorServiceRef reference to the executor service
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public SplitDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri
Mar  5 08:40:51 2010
@@ -37,7 +37,7 @@
  */
 @XmlRootElement(name = "threads")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> {
+public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> implements
ExecutorServiceAware<ThreadsDefinition> {
 
     @XmlTransient
     private ExecutorService executorService;
@@ -79,21 +79,13 @@
         return "Threads[" + getOutputs() + "]";
     }
 
-    /**
-     * Setting the executor service for the thread pool
-     *
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public ThreadsDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
 
-    /**
-     * Setting the executor service for the thread pool
-     *
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public ThreadsDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar
 5 08:40:51 2010
@@ -40,7 +40,7 @@
  */
 @XmlRootElement(name = "to")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ToDefinition extends SendDefinition<ToDefinition> {
+public class ToDefinition extends SendDefinition<ToDefinition> implements ExecutorServiceAware<ToDefinition>
{
     @XmlTransient
     private final List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlAttribute(required = false)
@@ -87,8 +87,14 @@
             return super.createProcessor(routeContext);
         }
 
+        // this code below is only for creating when async is enabled
+        // ----------------------------------------------------------
+
         if (executorServiceRef != null) {
             executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef
+ " not found in registry.");
+            }
         }
         if (executorService == null && poolSize != null) {
             executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync["
+ getLabel() + "]", true);
@@ -173,20 +179,20 @@
     }
 
     /**
-     * Setting the executor service for executing the async routing.
-     *
-     * @return the builder
+     * Sets the optional {@link ExchangePattern} used to invoke this endpoint
      */
+    public ToDefinition pattern(ExchangePattern pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
     public ToDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
 
-    /**
-     * Setting the executor service for executing the async routing.
-     *
-     * @return the builder
-     */
+    @SuppressWarnings("unchecked")
     public ToDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java Fri
Mar  5 08:40:51 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.concurrent.ExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -37,16 +38,18 @@
  */
 @XmlRootElement(name = "wireTap")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class WireTapDefinition extends SendDefinition<WireTapDefinition> {
+public class WireTapDefinition extends SendDefinition<WireTapDefinition> implements
ExecutorServiceAware<ProcessorDefinition> {
 
     @XmlTransient
     private Processor newExchangeProcessor;
-
     @XmlAttribute(name = "processorRef", required = false)
     private String newExchangeProcessorRef;
-
     @XmlElement(name = "body", required = false)
     private ExpressionSubElementDefinition newExchangeExpression;
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
 
     public WireTapDefinition() {
     }
@@ -72,6 +75,14 @@
             answer.setNewExchangeExpression(newExchangeExpression.createExpression(routeContext));
         }
 
+        if (executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef
+ " not found in registry.");
+            }
+        }
+        answer.setExecutorService(executorService);
+
         return answer;
     }
 
@@ -89,6 +100,20 @@
         return "wireTap";
     }
 
+    @SuppressWarnings("unchecked")
+    public ProcessorDefinition executorService(ExecutorService executorService) {
+        // wiretap has no outputs and therefore we cannot use custom wiretap builder methods
in Java DSL
+        // as the Java DSL is stretched so far we can using regular Java
+        throw new UnsupportedOperationException("wireTap does not support these builder methods");
+    }
+
+    @SuppressWarnings("unchecked")
+    public ProcessorDefinition executorServiceRef(String executorServiceRef) {
+        // wiretap has no outputs and therefore we cannot use custom wiretap builder methods
in Java DSL
+        // as the Java DSL is stretched so far we can using regular Java
+        throw new UnsupportedOperationException("wireTap does not support these builder methods");
+    }
+
     public Processor getNewExchangeProcessor() {
         return newExchangeProcessor;
     }
@@ -117,4 +142,19 @@
         this.newExchangeExpression = new ExpressionSubElementDefinition(expression);
     }
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public String getExecutorServiceRef() {
+        return executorServiceRef;
+    }
+
+    public void setExecutorServiceRef(String executorServiceRef) {
+        this.executorServiceRef = executorServiceRef;
+    }
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalCustomPoolTest.java
(from r919310, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalCustomPoolTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalCustomPoolTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java&r1=919310&r2=919331&rev=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalCustomPoolTest.java
Fri Mar  5 08:40:51 2010
@@ -16,74 +16,31 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision$
  */
-public class OnCompletionGlobalTest extends ContextTestSupport {
-
-    public void testSynchronizeComplete() throws Exception {
-        getMockEndpoint("mock:sync").expectedBodiesReceived("Bye World");
-        getMockEndpoint("mock:sync").expectedPropertyReceived(Exchange.ON_COMPLETION, true);
-
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Bye World");
-
-        template.sendBody("direct:start", "Hello World");
-
-        assertMockEndpointsSatisfied();
-    }
-
-    public void testSynchronizeFailure() throws Exception {
-        getMockEndpoint("mock:sync").expectedMessageCount(1);
-        getMockEndpoint("mock:sync").expectedPropertyReceived(Exchange.ON_COMPLETION, true);
-
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(0);
-
-        try {
-            template.sendBody("direct:start", "Kabom");
-            fail("Should throw exception");
-        } catch (CamelExecutionException e) {
-            assertEquals("Kabom", e.getCause().getMessage());
-        }
-
-        assertMockEndpointsSatisfied();
-    }
+public class OnCompletionGlobalCustomPoolTest extends OnCompletionGlobalTest {
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // START SNIPPET: e1
-                // define a global on completion that is invoked when the exchage is complete
-                onCompletion().to("log:global").to("mock:sync");
+                ExecutorService pool = Executors.newFixedThreadPool(2);
+
+                // use a custom thread pool
+                onCompletion().executorService(pool).to("log:global").to("mock:sync");
 
                 from("direct:start")
                     .process(new MyProcessor())
                     .to("mock:result");
-                // END SNIPPET: e1
             }
         };
     }
 
-    public static class MyProcessor implements Processor {
-
-        public MyProcessor() {
-        }
-
-        public void process(Exchange exchange) throws Exception {
-            if ("Kabom".equals(exchange.getIn().getBody())) {
-                throw new IllegalArgumentException("Kabom");
-            }
-            exchange.getIn().setBody("Bye World");
-        }
-    }
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionGlobalTest.java
Fri Mar  5 08:40:51 2010
@@ -63,7 +63,7 @@
             @Override
             public void configure() throws Exception {
                 // START SNIPPET: e1
-                // define a global on completion that is invoked when the exchage is complete
+                // define a global on completion that is invoked when the exchange is complete
                 onCompletion().to("log:global").to("mock:sync");
 
                 from("direct:start")

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
(from r919310, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapTest.java&r1=919310&r2=919331&rev=919331&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
Fri Mar  5 08:40:51 2010
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -25,7 +28,7 @@
  *
  * @version $Revision$
  */
-public class WireTapTest extends ContextTestSupport {
+public class WireTapCustomPoolTest extends ContextTestSupport {
     protected MockEndpoint tap;
     protected MockEndpoint result;
 
@@ -50,9 +53,13 @@
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: e1
+                // use a custom thread pool for sending tapped messages
+                ExecutorService pool = Executors.newFixedThreadPool(2);
+
                 from("direct:start")
                     .to("log:foo")
-                    .wireTap("direct:tap")
+                    // pass in the custom pool to the wireTap DSL
+                    .wireTap("direct:tap", pool)
                     .to("mock:result");
                 // END SNIPPET: e1
 

Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala?rev=919331&r1=919330&r2=919331&view=diff
==============================================================================
--- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
(original)
+++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SThreadsDefinition.scala
Fri Mar  5 08:40:51 2010
@@ -26,9 +26,6 @@
  */
 case class SThreadsDefinition(override val target: ThreadsDefinition)(implicit val builder:
RouteBuilder) extends SAbstractDefinition[ThreadsDefinition] {
 
-  def executorService(service: ExecutorService) = wrap(target.executorService(service))
-  def executorService(ref: String) = wrap(target.executorServiceRef(ref))
-
   def poolSize(size: Int) = wrap(target.poolSize(size))
 
   def waitForTaskToComplete(wait: WaitForTaskToComplete) = wrap(target.waitForTaskToComplete(wait))



Mime
View raw message