camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rom...@apache.org
Subject svn commit: r643855 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/
Date Wed, 02 Apr 2008 11:32:29 GMT
Author: romkal
Date: Wed Apr  2 04:32:26 2008
New Revision: 643855

URL: http://svn.apache.org/viewvc?rev=643855&view=rev
Log:
CAMEL-426 : Added parallel processing functionality to splitter

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=643855&r1=643854&r2=643855&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Wed Apr  2 04:32:26 2008
@@ -501,6 +501,84 @@
 
     /**
      * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * This splitter responds with the latest message returned from destination
+     * endpoint.
+     *
+     * @param receipients the expression on which to split
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
+     * @return the builder
+     */
+    public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
+        SplitterType answer = new SplitterType(receipients);
+        addOutput(answer);
+        answer.setParallelProcessing(parallelProcessing);
+        return answer;
+    }
+
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * This splitter responds with the latest message returned from destination
+     * endpoint.
+     *
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
+     * @return the expression clause for the expression on which to split
+     */
+    public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) {
+        SplitterType answer = new SplitterType();
+        addOutput(answer);
+        answer.setParallelProcessing(parallelProcessing);
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * Answer from the splitter is produced using given {@link AggregationStrategy}
+     * @param partsExpression the expression on which to split
+     * @param aggregationStrategy the strategy used to aggregate responses for
+     *          every part
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
+     * @return the builder
+     */
+    public SplitterType splitter(Expression partsExpression,
+            AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
+        SplitterType answer = new SplitterType(partsExpression);
+        addOutput(answer);
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setParallelProcessing(parallelProcessing);
+        return answer;
+    }
+
+    /**
+     * Creates the <a
+     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+     * pattern where an expression is evaluated to iterate through each of the
+     * parts of a message and then each part is then send to some endpoint.
+     * Answer from the splitter is produced using given {@link AggregationStrategy}
+     * @param aggregationStrategy the strategy used to aggregate responses for
+     *          every part
+     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to
call the endpoint producer
+     * @return the expression clause for the expression on which to split
+     */
+    public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy,
boolean parallelProcessing) {
+        SplitterType answer = new SplitterType();
+        addOutput(answer);
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setParallelProcessing(parallelProcessing);
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    
+    /**
+     * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
      * pattern where a list of expressions are evaluated to be able to compare
      * the message exchanges to reorder them. e.g. you may wish to sort by some

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=643855&r1=643854&r2=643855&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Wed Apr  2 04:32:26 2008
@@ -16,8 +16,14 @@
  */
 package org.apache.camel.model;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -37,6 +43,10 @@
 public class SplitterType extends ExpressionNode {
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlAttribute(required = false)
+    private Boolean parallelProcessing;
+    @XmlTransient
+    private ThreadPoolExecutor threadPoolExecutor;
     
     public SplitterType() {
     }
@@ -60,7 +70,11 @@
         if (aggregationStrategy == null) {
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
-        return new Splitter(getExpression().createExpression(routeContext), childProcessor,
aggregationStrategy);
+        if (threadPoolExecutor == null) {
+            threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
+        }
+        return new Splitter(getExpression().createExpression(routeContext), childProcessor,
aggregationStrategy,
+                isParallelProcessing(), threadPoolExecutor);
     }
     
     public AggregationStrategy getAggregationStrategy() {
@@ -69,5 +83,21 @@
 
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
+    }
+    
+    public boolean isParallelProcessing() {
+        return parallelProcessing != null ? parallelProcessing : false;
+    }
+
+    public void setParallelProcessing(boolean parallelProcessing) {
+        this.parallelProcessing = parallelProcessing;
+    }
+
+    public ThreadPoolExecutor getThreadPoolExecutor() {
+        return threadPoolExecutor;
+    }
+
+    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
+        this.threadPoolExecutor = threadPoolExecutor;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=643855&r1=643854&r2=643855&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Wed Apr  2 04:32:26 2008
@@ -20,6 +20,8 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
@@ -46,6 +48,26 @@
  * @version $Revision$
  */
 public class MulticastProcessor extends ServiceSupport implements Processor {
+    static class ProcessorExchangePair {
+        private final Processor processor;
+        private final Exchange exchange;
+        
+        public ProcessorExchangePair(Processor processor, Exchange exchange) {
+            this.processor = processor;
+            this.exchange = exchange;
+        }
+
+        public Processor getProcessor() {
+            return processor;
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+        
+        
+    }
+
     private Collection<Processor> processors;
     private AggregationStrategy aggregationStrategy;
     private boolean isParallelProcessing;
@@ -121,13 +143,18 @@
 
     public void process(Exchange exchange) throws Exception {
         Exchange result = null;
+
+        List<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
+        
         // Parallel Processing the producer
         if (isParallelProcessing) {
-            Exchange[] exchanges = new Exchange[processors.size()];
-            final CountDownLatch completedExchanges = new CountDownLatch(exchanges.length);
+            Exchange[] exchanges = new Exchange[pairs.size()];
+            final CountDownLatch completedExchanges = new CountDownLatch(pairs.size());
             int i = 0;
-            for (Processor producer : processors) {
-                exchanges[i] = copyExchangeStrategy(producer, exchange);
+            for (ProcessorExchangePair pair : pairs) {
+                Processor producer = pair.getProcessor();
+                exchanges[i] = pair.getExchange();
+                updateNewExchange(exchanges[i], i, pairs);
                 ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback()
{
                     public void done(boolean doneSynchronously) {
                         completedExchanges.countDown();
@@ -150,16 +177,21 @@
 
         } else {
             // we call the producer one by one sequentially
-            for (Processor producer : processors) {
-                Exchange copy = copyExchangeStrategy(producer, exchange);
-                producer.process(copy);
+            int i = 0;
+            for (ProcessorExchangePair pair : pairs) {
+                Processor producer = pair.getProcessor();
+                Exchange subExchange = pair.getExchange();
+                updateNewExchange(subExchange, i, pairs);
+                
+                producer.process(subExchange);
                 if (aggregationStrategy != null) {
                     if (result == null) {
-                        result = copy;
+                        result = subExchange;
                     } else {
-                        result = aggregationStrategy.aggregate(result, copy);
+                        result = aggregationStrategy.aggregate(result, subExchange);
                     }
                 }
+                i++;
             }
         }
         if (result != null) {
@@ -167,6 +199,20 @@
         }
     }
 
+    protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair>
allPairs) {
+        // No updates needed
+    }
+
+    protected List<ProcessorExchangePair> createProcessorExchangePairs(
+        Exchange exchange) {
+        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
+        Processor[] processorsArray = processors.toArray(new Processor[0]);
+        for (int i = 0; i < processorsArray.length; i++) {
+            result.add(new ProcessorExchangePair(processorsArray[i], exchange.copy()));
+        }
+        return result;
+    }
+
     protected void doStop() throws Exception {
         shutdown.set(true);
         if (executor != null) {
@@ -197,16 +243,7 @@
         return processors;
     }
 
-    /**
-     * Strategy method to copy the exchange before sending to another endpoint.
-     * Derived classes such as the {@link Pipeline} will not clone the exchange
-     *
-     * @param processor the processor that will send the exchange
-     * @param exchange
-     * @return the current exchange if no copying is required such as for a
-     *         pipeline otherwise a new copy of the exchange is returned.
-     */
-    protected Exchange copyExchangeStrategy(Processor processor, Exchange exchange) {
-        return exchange.copy();
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=643855&r1=643854&r2=643855&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Wed Apr  2 04:32:26 2008
@@ -17,19 +17,21 @@
 package org.apache.camel.processor;
 
 
+import static org.apache.camel.util.ObjectHelper.notNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.converter.ObjectConverter;
-import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.CollectionHelper;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ServiceHelper;
-import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
  * Implements a dynamic <a
@@ -39,60 +41,55 @@
  *
  * @version $Revision$
  */
-public class Splitter extends ServiceSupport implements Processor {
+public class Splitter extends MulticastProcessor implements Processor {
     public static final String SPLIT_SIZE = "org.apache.camel.splitSize";
     public static final String SPLIT_COUNTER = "org.apache.camel.splitCounter";
 
-    private final Processor processor;
     private final Expression expression;
-    private final AggregationStrategy aggregationStrategy;
 
     public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy)
{
-        this.processor = destination;
+        this(expression, destination, aggregationStrategy, false, null);
+    }
+
+    public Splitter(Expression expression, Processor destination,
+            AggregationStrategy aggregationStrategy,
+            boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing,
threadPoolExecutor);
+        
         this.expression = expression;
-        this.aggregationStrategy = aggregationStrategy;
-        notNull(destination, "destination");
         notNull(expression, "expression");
-        notNull(aggregationStrategy, "aggregationStrategy");
+        notNull(destination, "destination");
     }
 
     @Override
     public String toString() {
-        return "Splitter[on: " + expression + " to: " + processor + " aggregate: " + aggregationStrategy
+ "]";
+        return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next()
+ " aggregate: " + getAggregationStrategy() + "]";
     }
 
-    public void process(Exchange exchange) throws Exception {
+    @Override
+    protected List<ProcessorExchangePair> createProcessorExchangePairs(
+        Exchange exchange) {
         Object value = expression.evaluate(exchange);
-        Integer size = CollectionHelper.size(value);
-        Iterator iter = ObjectConverter.iterator(value);
-        int counter = 0;
-        Exchange result = null;
-        while (iter.hasNext()) {
+        Integer collectionSize = CollectionHelper.size(value);
+        List<ProcessorExchangePair> result;
+        if (collectionSize != null) {
+            result = new ArrayList<ProcessorExchangePair>(collectionSize);
+        } else {
+            result = new ArrayList<ProcessorExchangePair>();
+        }
+        for (Iterator<Object> iter = ObjectConverter.iterator(value); iter.hasNext();
) {
             Object part = iter.next();
             Exchange newExchange = exchange.copy();
             Message in = newExchange.getIn();
             in.setBody(part);
-            if (size != null) {
-                in.setHeader(SPLIT_SIZE, size);
-            }
-            in.setHeader(SPLIT_COUNTER, counter++);
-            processor.process(newExchange);
-            if (result == null) {
-                result = newExchange;
-            } else {
-                result = aggregationStrategy.aggregate(result, newExchange);
-            }
-        }
-        if (result != null) {
-            ExchangeHelper.copyResults(exchange, result);
+            result.add(new ProcessorExchangePair(getProcessors().iterator().next(), newExchange));
         }
+        return result;
     }
 
-    protected void doStart() throws Exception {
-        ServiceHelper.startServices(processor);
-    }
-
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(processor);
+    @Override
+    protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair>
allPairs) {
+        exchange.getIn().setHeader(SPLIT_COUNTER, i);
+        exchange.getIn().setHeader(SPLIT_SIZE, allPairs.size());
     }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java?rev=643855&r1=643854&r2=643855&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
Wed Apr  2 04:32:26 2008
@@ -17,6 +17,8 @@
 package org.apache.camel.processor;
 
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
@@ -35,7 +37,7 @@
         MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
         resultEndpoint.expectedBodiesReceived("James", "Guillaume", "Hiram", "Rob");
 
-        template.send("direct:a", new Processor() {
+        template.send("direct:seqential", new Processor() {
             public void process(Exchange exchange) {
                 Message in = exchange.getIn();
                 in.setBody("James,Guillaume,Hiram,Rob");
@@ -58,7 +60,7 @@
         MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
         resultEndpoint.expectedBodiesReceived("James", "Guillaume", "Hiram", "Rob", "Roman");
 
-        Exchange result = template.send("direct:a", new Processor() {
+        Exchange result = template.send("direct:seqential", new Processor() {
             public void process(Exchange exchange) {
                 Message in = exchange.getIn();
                 in.setBody("James,Guillaume,Hiram,Rob,Roman");
@@ -74,7 +76,7 @@
     }
     
     public void testEmptyBody() {
-        Exchange result = template.send("direct:a", new Processor() {
+        Exchange result = template.send("direct:seqential", new Processor() {
             public void process(Exchange exchange) throws Exception {
                 exchange.getIn().setHeader("foo", "bar");
             }
@@ -83,10 +85,64 @@
         assertNull(result.getOut(false));
     }
     
+    public void testSendingAMessageUsingMulticastReceivesItsOwnExchangeParallel() throws
Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        
+        resultEndpoint.expectsNoDuplicates(body());
+        resultEndpoint.expectedMessageCount(4);
+        
+        template.send("direct:parallel", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisifed();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        
+        Set<Integer> numbersFound = new TreeSet<Integer>();
+        
+        final String[] NAMES = {"James", "Guillaume", "Hiram", "Rob"};
+        
+        for (int i = 0; i < 4; i++) {
+            Exchange exchange = list.get(i);
+            Message in = exchange.getIn();
+            Integer splitCounter = in.getHeader(Splitter.SPLIT_COUNTER, Integer.class);
+            numbersFound.add(splitCounter);
+            assertEquals(NAMES[splitCounter], in.getBody());
+            assertMessageHeader(in, Splitter.SPLIT_SIZE, 4);
+        }
+        
+        assertEquals(4, numbersFound.size());
+    }
+
+    public void testSpliterWithAggregationStrategyParallel() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+
+        Exchange result = template.send("direct:parallel", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisifed();
+        Message out = result.getOut();
+
+        assertMessageHeader(out, "foo", "bar");
+        assertEquals((Integer)5, result.getProperty("aggregated", Integer.class));
+    }
+    
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a").splitter(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
+                from("direct:seqential").splitter(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
+                from("direct:parallel").splitter(body().tokenize(","), new MyAggregationStrategy(),
true).to("mock:result");
             }
         };
     }



Mime
View raw message