camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r695502 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/concurrent/ test/java/org/apache/camel/processor/
Date Mon, 15 Sep 2008 15:17:25 GMT
Author: gertv
Date: Mon Sep 15 08:17:24 2008
New Revision: 695502

URL: http://svn.apache.org/viewvc?rev=695502&view=rev
Log:
CAMEL-876: splitter() should support batch for processing large files

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=695502&r1=695501&r2=695502&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Mon Sep 15 08:17:24 2008
@@ -35,6 +35,7 @@
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.concurrent.AtomicExchange;
 import org.apache.camel.util.concurrent.CountingLatch;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
@@ -69,6 +70,7 @@
     private AggregationStrategy aggregationStrategy;
     private boolean isParallelProcessing;
     private ThreadPoolExecutor executor;
+    private final boolean streaming;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
 
     public MulticastProcessor(Collection<Processor> processors) {
@@ -78,8 +80,12 @@
     public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy) {
         this(processors, aggregationStrategy, false, null);
     }
-
+    
     public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) {
+        this(processors, aggregationStrategy, false, null, false);
+    }
+
+    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy
aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor, boolean streaming)
{
         notNull(processors, "processors");
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
@@ -92,6 +98,7 @@
                 this.executor = new ThreadPoolExecutor(processors.size(), processors.size(),
0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
             }
         }
+        this.streaming = streaming;
     }
 
     /**
@@ -138,7 +145,7 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        Exchange result = null;
+        final AtomicExchange result = new AtomicExchange();
 
         Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
         
@@ -149,13 +156,16 @@
             int i = 0;
             for (ProcessorExchangePair pair : pairs) {
                 Processor producer = pair.getProcessor();
-                Exchange subExchange = pair.getExchange();
+                final Exchange subExchange = pair.getExchange();
                 updateNewExchange(subExchange, i, pairs);
                 exchanges.add(subExchange);
                 completedExchanges.increment(); 
                 ProcessCall call = new ProcessCall(subExchange, producer, new AsyncCallback()
{
                     public void done(boolean doneSynchronously) {
                         completedExchanges.decrement();
+                        if (streaming && aggregationStrategy != null) {
+                            doAggregate(result, subExchange);
+                        }
                     }
 
                 });
@@ -163,13 +173,9 @@
                 i++;
             }
             completedExchanges.await();
-            if (aggregationStrategy != null) {
+            if (!streaming && aggregationStrategy != null) {
                 for (Exchange resultExchange : exchanges) {
-                    if (result == null) {
-                        result = resultExchange;
-                    } else {
-                        result = aggregationStrategy.aggregate(result, resultExchange);
-                    }
+                    doAggregate(result, resultExchange);
                 }
             }
 
@@ -182,18 +188,29 @@
                 updateNewExchange(subExchange, i, pairs);
 
                 producer.process(subExchange);
-                if (aggregationStrategy != null) {
-                    if (result == null) {
-                        result = subExchange;
-                    } else {
-                        result = aggregationStrategy.aggregate(result, subExchange);
-                    }
-                }
+                doAggregate(result, subExchange);
                 i++;
             }
         }
-        if (result != null) {
-            ExchangeHelper.copyResults(exchange, result);
+        if (result.get() != null) {
+            ExchangeHelper.copyResults(exchange, result.get());
+        }
+    }
+
+    /**
+     * Aggregate the {@link Exchange} with the current {@link Result}
+     * @param result the current result 
+     * @param exchange the exchange to be added to the result
+     * 
+     * @return the new exchange, consisting of the aggregated information
+     */
+    protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
+        if (aggregationStrategy != null) {
+            if (result.get() == null) {
+                result.set(exchange);
+            } else {
+                result.set(aggregationStrategy.aggregate(result.get(), exchange));
+            }
         }
     }
 
@@ -232,6 +249,20 @@
         }
         ServiceHelper.startServices(processors);
     }
+    
+    /**
+     * Is the multicast processor working in streaming mode?
+     * 
+     * In streaming mode:
+     * <ul>
+     * <li>we use {@link Iterable} to ensure we can send messages as soon as the data
becomes available</li>
+     * <li>for parallel processing, we start aggregating responses as they get send
back to the processor;
+     * this means the {@link AggregatorStrategy} has to take care of handling out-of-order
arrival of exchanges</li>
+     * </ul>
+     */
+    protected boolean isStreaming() {
+        return streaming;
+    }
 
     /**
      * Returns the producers to multicast to

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=695502&r1=695501&r2=695502&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Mon Sep 15 08:17:24 2008
@@ -47,7 +47,6 @@
     public static final String SPLIT_COUNTER = "org.apache.camel.splitCounter";
 
     private final Expression expression;
-    private final boolean streaming;
 
     public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy)
{
         this(expression, destination, aggregationStrategy, false, null, false);
@@ -56,10 +55,9 @@
     public Splitter(Expression expression, Processor destination,
             AggregationStrategy aggregationStrategy,
             boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor, boolean streaming)
{
-        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing,
threadPoolExecutor);
+        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing,
threadPoolExecutor, streaming);
 
         this.expression = expression;
-        this.streaming = streaming;
         notNull(expression, "expression");
         notNull(destination, "destination");
     }
@@ -73,7 +71,7 @@
     protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange
exchange) {
         Object value = expression.evaluate(exchange);
 
-        if (streaming) {
+        if (isStreaming()) {
             return createProcessorExchangePairsIterable(exchange, value);
         } else {
             return createProcessorExchangePairsList(exchange, value);

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java?rev=695502&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
Mon Sep 15 08:17:24 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Convenience class for holding an {@link Exchange} in a thread-safe way
+ * @author gert
+ *
+ */
+@SuppressWarnings("serial")
+public class AtomicExchange extends AtomicReference<Exchange> {
+
+}

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java?rev=695502&r1=695501&r2=695502&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
Mon Sep 15 08:17:24 2008
@@ -45,7 +45,7 @@
             }
         });
 
-        assertMockEndpointsSatisifed();
+        assertMockEndpointsSatisfied();
 
         List<Exchange> list = resultEndpoint.getReceivedExchanges();
         for (int i = 0; i < 4; i++) {
@@ -68,7 +68,7 @@
             }
         });
 
-        assertMockEndpointsSatisifed();
+        assertMockEndpointsSatisfied();
         Message out = result.getOut();
         assertEquals("Roman", out.getBody());
         assertMessageHeader(out, "foo", "bar");
@@ -99,7 +99,7 @@
             }
         });
 
-        assertMockEndpointsSatisifed();
+        assertMockEndpointsSatisfied();
 
         List<Exchange> list = resultEndpoint.getReceivedExchanges();
 
@@ -131,11 +131,31 @@
             }
         });
 
-        assertMockEndpointsSatisifed();
+        assertMockEndpointsSatisfied();
+        Message out = result.getOut();
+
+        assertMessageHeader(out, "foo", "bar");
+        assertEquals((Integer)5, result.getProperty("aggregated", Integer.class));
+    }
+    
+    public void testSpliterWithAggregationStrategyParallelStreaming() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+
+        Exchange result = template.send("direct:parallel-streaming", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
         Message out = result.getOut();
 
         assertMessageHeader(out, "foo", "bar");
         assertEquals((Integer)5, result.getProperty("aggregated", Integer.class));
+        System.out.println(result);
     }
     
     public void testSplitterWithStreaming() throws Exception {
@@ -166,6 +186,7 @@
                 from("direct:seqential").splitter(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
                 from("direct:parallel").splitter(body().tokenize(","), new MyAggregationStrategy(),
true).to("mock:result");
                 from("direct:streaming").splitter(body().tokenize(",")).streaming().to("mock:result");
+                from("direct:parallel-streaming").splitter(body().tokenize(","), new MyAggregationStrategy(),
true).streaming().to("mock:result");
             }
         };
     }



Mime
View raw message