camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r722800 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/
Date Wed, 03 Dec 2008 07:27:53 GMT
Author: ningjiang
Date: Tue Dec  2 23:27:52 2008
New Revision: 722800

URL: http://svn.apache.org/viewvc?rev=722800&view=rev
Log:
CAMEL-1098 Applied patch with thanks to Tim

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastType.java
Tue Dec  2 23:27:52 2008
@@ -17,7 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -49,7 +49,7 @@
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
     @XmlTransient
-    private ThreadPoolExecutor threadPoolExecutor;
+    private Executor executor;
 
     @Override
     public String toString() {
@@ -74,9 +74,9 @@
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
         if (threadPoolRef != null) {
-            threadPoolExecutor = routeContext.lookup(threadPoolRef, ThreadPoolExecutor.class);
+            executor = routeContext.lookup(threadPoolRef, Executor.class);
         }
-        return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(),
threadPoolExecutor);
+        return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(),
executor);
     }
 
     public AggregationStrategy getAggregationStrategy() {
@@ -97,12 +97,12 @@
         return this;
     }
 
-    public ThreadPoolExecutor getThreadPoolExecutor() {
-        return threadPoolExecutor;
+    public Executor getExecutor() {
+        return executor;
     }
 
-    public MulticastType setThreadPoolExecutor(ThreadPoolExecutor executor) {
-        this.threadPoolExecutor = executor;
+    public MulticastType setThreadPoolExecutor(Executor executor) {
+        this.executor = executor;
         return this;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Tue Dec  2 23:27:52 2008
@@ -24,7 +24,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -298,7 +299,7 @@
      * @return a ThreadType builder that can be used to further configure the
      *         the thread pool.
      */
-    public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
+    public ProcessorType<Type> thread(Executor executor) {
         ThreadType answer = new ThreadType(executor);
         addOutput(answer);
         return this;
@@ -593,15 +594,15 @@
      *
      * @param expression the expression on which to split
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
-     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @param executor override the default {@link Executor}
      * @return the builder
      */
     public SplitterType split(Expression expression, boolean parallelProcessing,
-                                 ThreadPoolExecutor threadPoolExecutor) {
+                                 Executor executor) {
         SplitterType answer = new SplitterType(expression);
         addOutput(answer);
         answer.setParallelProcessing(parallelProcessing);
-        answer.setThreadPoolExecutor(threadPoolExecutor);
+        answer.setExecutor(executor);
         return answer;
     }    
     
@@ -629,14 +630,14 @@
       * The splitter responds with the answer produced by the given {@link AggregationStrategy}.
      *
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
-     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @param executor override the default {@link Executor}
      * @return the expression clause for the expression on which to split
      */
-    public ExpressionClause<SplitterType> split(boolean parallelProcessing, ThreadPoolExecutor
threadPoolExecutor) {
+    public ExpressionClause<SplitterType> split(boolean parallelProcessing, Executor
executor) {
         SplitterType answer = new SplitterType();
         addOutput(answer);
         answer.setParallelProcessing(parallelProcessing);
-        answer.setThreadPoolExecutor(threadPoolExecutor);
+        answer.setExecutor(executor);
         return ExpressionClause.createAndSetExpression(answer);
     }    
     
@@ -669,16 +670,16 @@
      * @param expression the expression on which to split
      * @param aggregationStrategy the strategy used to aggregate responses for every part
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
-     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @param executor override the default {@link Executor}
      * @return the builder
      */
     public SplitterType split(Expression expression, AggregationStrategy aggregationStrategy,
-                                 boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor)
{
+                                 boolean parallelProcessing, Executor executor) {
         SplitterType answer = new SplitterType(expression);
         addOutput(answer);
         answer.setAggregationStrategy(aggregationStrategy);
         answer.setParallelProcessing(parallelProcessing);
-        answer.setThreadPoolExecutor(threadPoolExecutor);        
+        answer.setExecutor(executor);
         return answer;
     }    
     
@@ -708,16 +709,16 @@
      *
      * @param aggregationStrategy the strategy used to aggregate responses for every part
      * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
-     * @param threadPoolExecutor override the default {@link ThreadPoolExecutor} 
+     * @param executor override the default {@link Executor}
      * @return the expression clause for the expression on which to split
      */
     public ExpressionClause<SplitterType> split(AggregationStrategy aggregationStrategy,
boolean parallelProcessing,
-                                                   ThreadPoolExecutor threadPoolExecutor)
{
+                                                   Executor executor) {
         SplitterType answer = new SplitterType();
         addOutput(answer);
         answer.setAggregationStrategy(aggregationStrategy);
         answer.setParallelProcessing(parallelProcessing);
-        answer.setThreadPoolExecutor(threadPoolExecutor);           
+        answer.setExecutor(executor);
         return ExpressionClause.createAndSetExpression(answer);
     }   
     

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Tue Dec  2 23:27:52 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +48,7 @@
     @XmlAttribute(required = false)
     private Boolean parallelProcessing;
     @XmlTransient
-    private ThreadPoolExecutor threadPoolExecutor;
+    private Executor executor;
     @XmlAttribute(required = false)
     private String threadPoolExecutorRef;
     @XmlAttribute(required = false)
@@ -80,9 +81,9 @@
         if (aggregationStrategy == null) {
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
-        threadPoolExecutor = createThreadPoolExecutor(routeContext);
+        executor = createThreadPoolExecutor(routeContext);
         return new Splitter(getExpression().createExpression(routeContext), childProcessor,
aggregationStrategy,
-                isParallelProcessing(), threadPoolExecutor, streaming);
+                isParallelProcessing(), executor, streaming);
     }
     
     public AggregationStrategy getAggregationStrategy() {
@@ -127,23 +128,23 @@
         return this;
     }
 
-    private ThreadPoolExecutor createThreadPoolExecutor(RouteContext routeContext) {
-        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
-        if (threadPoolExecutor == null && threadPoolExecutorRef != null) {
-            threadPoolExecutor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
+    private Executor createThreadPoolExecutor(RouteContext routeContext) {
+        Executor executor = getExecutor();
+        if (executor == null && threadPoolExecutorRef != null) {
+            executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
         }
-        if (threadPoolExecutor == null) {
+        if (executor == null) {
             // fall back and use default
-            threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
+            executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         }
-        return threadPoolExecutor;
+        return executor;
     }    
    
-    public ThreadPoolExecutor getThreadPoolExecutor() {
-        return threadPoolExecutor;
+    public Executor getExecutor() {
+        return executor;
     }
 
-    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
-        this.threadPoolExecutor = threadPoolExecutor;
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java Tue
Dec  2 23:27:52 2008
@@ -19,7 +19,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -63,7 +63,7 @@
     @XmlTransient
     private ThreadGroup threadGroup;
     @XmlTransient
-    private ThreadPoolExecutor executor;
+    private Executor executor;
 
     public ThreadType() {
     }
@@ -73,7 +73,7 @@
         this.maxSize = coreSize;
     }
 
-    public ThreadType(ThreadPoolExecutor executor) {
+    public ThreadType(Executor executor) {
         this.executor = executor;
     }
 
@@ -245,7 +245,7 @@
      * @param executor  the executor
      * @return the builder
      */
-    public ThreadType executor(ThreadPoolExecutor executor) {
+    public ThreadType executor(Executor executor) {
         setExecutor(executor);
         return this;
     }
@@ -292,11 +292,11 @@
         this.threadGroup = threadGroup;
     }
 
-    public ThreadPoolExecutor getExecutor() {
+    public Executor getExecutor() {
         return executor;
     }
 
-    public void setExecutor(ThreadPoolExecutor executor) {
+    public void setExecutor(Executor executor) {
         this.executor = executor;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Tue Dec  2 23:27:52 2008
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -69,7 +70,7 @@
     private Collection<Processor> processors;
     private AggregationStrategy aggregationStrategy;
     private boolean isParallelProcessing;
-    private ThreadPoolExecutor executor;
+    private Executor executor;
     private final boolean streaming;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
 
@@ -81,11 +82,11 @@
         this(processors, aggregationStrategy, false, null);
     }
     
-    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) {
+    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy, boolean parallelProcessing, Executor executor) {
         this(processors, aggregationStrategy, parallelProcessing, executor, false);
     }
 
-    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor, boolean streaming)
{
+    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy, boolean parallelProcessing, Executor executor, boolean streaming) {
         notNull(processors, "processors");
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
@@ -228,17 +229,17 @@
 
     protected void doStop() throws Exception {
         shutdown.set(true);
-        if (executor != null) {
-            executor.shutdown();
-            executor.awaitTermination(0, TimeUnit.SECONDS);
+        if (executor != null && executor instanceof ThreadPoolExecutor) {
+            ((ThreadPoolExecutor)executor).shutdown();
+            ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS);
         }
         ServiceHelper.stopServices(processors);
     }
 
     protected void doStart() throws Exception {
         shutdown.set(false);
-        if (executor != null) {
-            executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+        if (executor != null && executor instanceof ThreadPoolExecutor) {
+            ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(new RejectedExecutionHandler()
{
                 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
{
                     ProcessCall call = (ProcessCall)runnable;
                     call.exchange.setException(new RejectedExecutionException());
@@ -274,7 +275,7 @@
         return aggregationStrategy;
     }
 
-    public ThreadPoolExecutor getExecutor() {
+    public Executor getExecutor() {
         return executor;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Tue Dec  2 23:27:52 2008
@@ -22,7 +22,7 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -53,8 +53,8 @@
     }
 
     public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
-            boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor, boolean streaming)
{
-        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing,
threadPoolExecutor, streaming);
+            boolean parallelProcessing, Executor executor, boolean streaming) {
+        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing,
executor, streaming);
 
         this.expression = expression;
         notNull(expression, "expression");

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
Tue Dec  2 23:27:52 2008
@@ -18,10 +18,11 @@
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
@@ -37,7 +38,7 @@
  */
 public class ThreadProcessor implements AsyncProcessor, Service {
 
-    private ThreadPoolExecutor executor;
+    private Executor executor;
     private long stackSize;
     private ThreadGroup threadGroup;
     private int priority = Thread.NORM_PRIORITY;
@@ -100,8 +101,10 @@
 
     public void stop() throws Exception {
         shutdown.set(true);
-        executor.shutdown();
-        executor.awaitTermination(0, TimeUnit.SECONDS);
+        if (executor instanceof ThreadPoolExecutor) {
+            ((ThreadPoolExecutor)executor).shutdown();
+            ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS);
+        }
     }
 
     public long getStackSize() {
@@ -179,7 +182,7 @@
         this.taskQueue = taskQueue;
     }
 
-    public ThreadPoolExecutor getExecutor() {
+    public Executor getExecutor() {
         if (executor == null) {
             executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(), getKeepAliveTime(),
TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() {
                 public Thread newThread(Runnable runnable) {
@@ -198,7 +201,7 @@
         return executor;
     }
 
-    public void setExecutor(ThreadPoolExecutor executor) {
+    public void setExecutor(Executor executor) {
         this.executor = executor;
     }
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Tue Dec  2 23:27:52 2008
@@ -88,7 +88,7 @@
                                                + " being less than: " + failUntilAttempt);
                 }
             }
-
+            // START SNIPPET: AsyncProcessor
             public boolean process(Exchange exchange, AsyncCallback callback) {         
      
                 Integer counter = exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
                                                              Integer.class);
@@ -105,6 +105,7 @@
                 callback.done(false);
                 return false;
             }
+            // END SNIPPET: AsyncProcessor
         };
 
         return new RouteBuilder() {

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=722800&r1=722799&r2=722800&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
Tue Dec  2 23:27:52 2008
@@ -32,7 +32,7 @@
     protected ThreadPoolExecutor customThreadPoolExecutor = new ThreadPoolExecutor(8, 16,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
 
     public void testSplitterWithCustomThreadPoolExecutor() throws Exception {
-        ThreadPoolExecutor threadPoolExecutor = getSplitter().getThreadPoolExecutor();
+        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) getSplitter().getExecutor();
         // this should be sufficient as core pool size is the only thing I changed from the
default
         assertTrue(threadPoolExecutor.getCorePoolSize() == customThreadPoolExecutor.getCorePoolSize());
         assertTrue(threadPoolExecutor.getMaximumPoolSize() == customThreadPoolExecutor.getMaximumPoolSize());



Mime
View raw message