Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 80395 invoked from network); 7 Jan 2011 16:42:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 7 Jan 2011 16:42:33 -0000 Received: (qmail 15531 invoked by uid 500); 7 Jan 2011 16:42:33 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 15440 invoked by uid 500); 7 Jan 2011 16:42:33 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 15432 invoked by uid 99); 7 Jan 2011 16:42:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jan 2011 16:42:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jan 2011 16:42:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CF5F823889C5; Fri, 7 Jan 2011 16:42:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1056380 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/MulticastProcessor.java main/java/org/apache/camel/processor/RecipientListProcessor.java test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java Date: Fri, 07 Jan 2011 16:42:08 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110107164208.CF5F823889C5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Fri Jan 7 16:42:08 2011 New Revision: 1056380 URL: http://svn.apache.org/viewvc?rev=1056380&view=rev Log: CAMEL-3497: Optimized multicast,splitter,recipient list to re-use error handlers for sub messages. This reduces memory consumption. Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056380&r1=1056379&r2=1056380&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Jan 7 16:42:08 2011 @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -52,6 +54,7 @@ import org.apache.camel.util.AsyncProces import org.apache.camel.util.CastUtils; import org.apache.camel.util.EventHelper; import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.StopWatch; @@ -120,6 +123,19 @@ public class MulticastProcessor extends } + /** + * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges + *

+ * See the createProcessorExchangePair and createErrorHandler methods. + */ + static final class PreparedErrorHandler extends KeyValueHolder { + + public PreparedErrorHandler(RouteContext key, Processor value) { + super(key, value); + } + + } + private final CamelContext camelContext; private Collection processors; private final AggregationStrategy aggregationStrategy; @@ -128,6 +144,7 @@ public class MulticastProcessor extends private final boolean stopOnException; private final ExecutorService executorService; private final long timeout; + private final ConcurrentMap errorHandlers = new ConcurrentHashMap(); public MulticastProcessor(CamelContext camelContext, Collection processors) { this(camelContext, processors, null); @@ -672,27 +689,53 @@ public class MulticastProcessor extends // set property which endpoint we send to setToEndpoint(copy, prepared); - // TODO: optimize to reuse error handlers instead of re-building for each exchange pair // rework error handling to support fine grained error handling + prepared = createErrorHandler(exchange, prepared); + + return new DefaultProcessorExchangePair(index, processor, prepared, copy); + } + + protected Processor createErrorHandler(Exchange exchange, Processor processor) { + Processor answer = processor; + if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) { // wrap the producer in error handler so we have fine grained error handling on // the output side instead of the input side // this is needed to support redelivery on that output alone and not doing redelivery // for the entire multicast block again which will start from scratch again RouteContext routeContext = exchange.getUnitOfWork().getRouteContext(); + + // create key for cache + final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); + + // lookup cached first to reuse and preserve memory + answer = errorHandlers.get(key); + if (answer != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Using existing error handler for: " + processor); + } + return answer; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Creating error handler for: " + processor); + } ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder(); // create error handler (create error handler directly to keep it light weight, // instead of using ProcessorDefinition.wrapInErrorHandler) try { - prepared = builder.createErrorHandler(routeContext, prepared); + processor = builder.createErrorHandler(routeContext, processor); // and wrap in unit of work processor so the copy exchange also can run under UoW - prepared = new UnitOfWorkProcessor(prepared); + answer = new UnitOfWorkProcessor(processor); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } + + // add to cache + errorHandlers.putIfAbsent(key, answer); } - return new DefaultProcessorExchangePair(index, processor, prepared, copy); + return answer; } protected void doStart() throws Exception { @@ -707,6 +750,7 @@ public class MulticastProcessor extends protected void doStop() throws Exception { ServiceHelper.stopServices(processors); + errorHandlers.clear(); } protected static void setToEndpoint(Exchange exchange, Processor processor) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=1056380&r1=1056379&r2=1056380&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Fri Jan 7 16:42:08 2011 @@ -26,12 +26,9 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.impl.ProducerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; -import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -192,23 +189,7 @@ public class RecipientListProcessor exte setToEndpoint(copy, prepared); // rework error handling to support fine grained error handling - if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) { - // wrap the producer in error handler so we have fine grained error handling on - // the output side instead of the input side - // this is needed to support redelivery on that output alone and not doing redelivery - // for the entire multicast block again which will start from scratch again - RouteContext routeContext = exchange.getUnitOfWork().getRouteContext(); - ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder(); - // create error handler (create error handler directly to keep it light weight, - // instead of using ProcessorDefinition.wrapInErrorHandler) - try { - prepared = builder.createErrorHandler(routeContext, prepared); - // and wrap in unit of work processor so the copy exchange also can run under UoW - prepared = new UnitOfWorkProcessor(prepared); - } catch (Exception e) { - throw ObjectHelper.wrapRuntimeCamelException(e); - } - } + prepared = createErrorHandler(exchange, prepared); return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java?rev=1056380&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java Fri Jan 7 16:42:08 2011 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.StopWatch; +import org.apache.camel.util.TimeUtils; + +/** + * @version $Revision: 1043882 $ + */ +public class SplitterParallelBigFileTest extends ContextTestSupport { + + private int lines = 20000; + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/split"); + createDirectory("target/split"); + createBigFile(); + super.setUp(); + } + + private void createBigFile() throws Exception { + File file = new File("target/split/bigfile.txt"); + FileOutputStream fos = new FileOutputStream(file); + for (int i = 0; i < lines; i++) { + String line = "line-" + i + "\n"; + fos.write(line.getBytes()); + } + IOHelper.close(fos); + } + + public void testNoop() { + // noop + } + + // disabled due manual test + public void xxxtestSplitParallelBigFile() throws Exception { + StopWatch watch = new StopWatch(); + + NotifyBuilder builder = new NotifyBuilder(context).whenDone(lines + 1).create(); + boolean done = builder.matches(5, TimeUnit.MINUTES); + + log.info("Took " + TimeUtils.printDuration(watch.stop())); + + if (!done) { + throw new CamelException("Could not split file in 5 minutes"); + } + + // need a little sleep for capturing memory profiling + // Thread.sleep(60 * 1000); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // lower max pool to 10 for less number of concurrent threads + //context.getExecutorServiceStrategy().getDefaultThreadPoolProfile().setMaxPoolSize(10); + + from("file:target/split") + .split(body().tokenize("\n")).parallelProcessing() + .to("log:split?groupSize=1000"); + } + }; + } + +}