From commits-return-60650-archive-asf-public=cust-asf.ponee.io@camel.apache.org Thu Jan 4 19:52:15 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 75E3E180657 for ; Thu, 4 Jan 2018 19:52:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 65FE9160C3F; Thu, 4 Jan 2018 18:52:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4DC85160C2B for ; Thu, 4 Jan 2018 19:52:14 +0100 (CET) Received: (qmail 83674 invoked by uid 500); 4 Jan 2018 18:52:13 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 83665 invoked by uid 99); 4 Jan 2018 18:52:13 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jan 2018 18:52:13 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E10FC856CB; Thu, 4 Jan 2018 18:52:11 +0000 (UTC) Date: Thu, 04 Jan 2018 18:52:11 +0000 To: "commits@camel.apache.org" Subject: [camel] 01/07: CAMEL-12120: Routingslip/Dynamic-Router EIPs can cause error handlers to be stopped for shared error handlers. Backported older working code. CAMEL-10050 is re-introduced again and we need to come up with a better solution. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: davsclaus@apache.org In-Reply-To: <151509193029.13799.3495770936535783563@gitbox.apache.org> References: <151509193029.13799.3495770936535783563@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: camel X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 9687a0ca2e9572c0970bc227c7755592e8652dc1 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180104185211.E10FC856CB@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git commit 9687a0ca2e9572c0970bc227c7755592e8652dc1 Author: Claus Ibsen AuthorDate: Thu Jan 4 10:12:57 2018 +0100 CAMEL-12120: Routingslip/Dynamic-Router EIPs can cause error handlers to be stopped for shared error handlers. Backported older working code. CAMEL-10050 is re-introduced again and we need to come up with a better solution. --- .../org/apache/camel/processor/RoutingSlip.java | 58 +++++++++------ .../camel/issues/RoutingSlipMemoryLeakTwoTest.java | 84 +++++++++++++++++++++ .../RoutingSlipMemoryLeakUniqueSlipsTest.java | 87 ++++++++++++++++++++++ .../issues/RoutingSlipNotStopErrorHandlerTest.java | 69 +++++++++++++++++ 4 files changed, 275 insertions(+), 23 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index 88aae16..4c4e574 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -17,6 +17,8 @@ package org.apache.camel.processor; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -42,6 +44,7 @@ import org.apache.camel.spi.RouteContext; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; @@ -70,6 +73,20 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace protected Expression expression; protected String uriDelimiter; protected final CamelContext camelContext; + private final ConcurrentMap errorHandlers = new ConcurrentHashMap(); + + /** + * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges + *

+ * This is similar to how multicast processor does. + */ + static final class PreparedErrorHandler extends KeyValueHolder { + + PreparedErrorHandler(String key, Processor value) { + super(key, value); + } + + } /** * The iterator to be used for retrieving the next routing slip(s) to be used. @@ -321,6 +338,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // this is needed to support redelivery on that output alone and not doing redelivery // for the entire routingslip/dynamic-router block again which will start from scratch again + // create key for cache + final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor); + + // lookup cached first to reuse and preserve memory + answer = errorHandlers.get(key); + if (answer != null) { + log.trace("Using existing error handler for: {}", processor); + return answer; + } + log.trace("Creating error handler for: {}", processor); ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); // create error handler (create error handler directly to keep it light weight, @@ -331,6 +358,9 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // must start the error handler ServiceHelper.startServices(answer); + // add to cache + errorHandlers.putIfAbsent(key, answer); + } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -430,16 +460,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace // copy results back to the original exchange ExchangeHelper.copyResults(original, current); - - if (target instanceof DeadLetterChannel) { - Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter(); - try { - ServiceHelper.stopService(deadLetter); - } catch (Exception e) { - log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e); - } - } - } catch (Throwable e) { + } catch (Throwable e) { exchange.setException(e); } @@ -449,18 +470,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace } }); - // stop error handler if we completed synchronously - if (answer) { - if (target instanceof DeadLetterChannel) { - Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter(); - try { - ServiceHelper.stopService(deadLetter); - } catch (Exception e) { - log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e); - } - } - } - return answer; } }); @@ -489,7 +498,10 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace } protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(producerCache); + ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers); + + // only clear error handlers when shutting down + errorHandlers.clear(); } public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java new file mode 100644 index 0000000..34b174c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import java.lang.reflect.Field; +import java.util.Map; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.RoutingSlip; + +public class RoutingSlipMemoryLeakTwoTest extends ContextTestSupport { + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/output"); + super.setUp(); + } + + /** + * Reproducer for the memory leak: CAMEL-10048 + */ + public void testMemoryLeakInExceptionHandlerCaching() throws Exception { + int messageCount = 100; + for (int i = 0; i < messageCount; i++) { + template.sendBody("direct:start", "message " + i); + } + RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class); + assertNotNull(routingSlip); + + Map errorHandlers = getRoutingSlipErrorHandlers(routingSlip); + assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size()); + } + + private Map getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception { + Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers"); + errorHandlersField.setAccessible(true); + Map errorHandlers = (Map) errorHandlersField.get(routingSlip); + return errorHandlers; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .routingSlip(method(SlipProvider.class)).id("memory-leak"); + } + }; + } + + public static class SlipProvider { + + public String computeSlip(String body) { + /* + * It is important to have a processor here, that does not extend + * AsyncProcessor. Only in this case + * AsyncProcessorConverterHelper.convert() creates a new object, + * thus leading to a memory leak. For example, if you replace file + * endpoint with mock endpoint, then everything goes fine, because + * MockEndpoint.createProducer() creates an implementation of + * AsyncProcessor. + */ + return "file:target/output"; + } + } +} \ No newline at end of file diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java new file mode 100644 index 0000000..39db36c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.RoutingSlip; + +public class RoutingSlipMemoryLeakUniqueSlipsTest extends ContextTestSupport { + + private static final AtomicInteger counter = new AtomicInteger(0); + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/output"); + super.setUp(); + } + + /** + * Reproducer for the memory leak: CAMEL-10048/CAMEL-10050 + */ + public void testMemoryLeakInExceptionHandlerCaching() throws Exception { + int messageCount = 100; + for (int i = 0; i < messageCount; i++) { + template.sendBody("direct:start", "message " + i); + } + RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class); + assertNotNull(routingSlip); + + Map errorHandlers = getRoutingSlipErrorHandlers(routingSlip); + assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size()); + } + + private Map getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception { + Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers"); + errorHandlersField.setAccessible(true); + Map errorHandlers = (Map) errorHandlersField.get(routingSlip); + return errorHandlers; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .routingSlip(method(SlipProvider.class)).id("memory-leak"); + } + }; + } + + public static class SlipProvider { + + public String computeSlip(String body) { + /* + * It is important to have a processor here, that does not extend + * AsyncProcessor. Only in this case + * AsyncProcessorConverterHelper.convert() creates a new object, + * thus leading to a memory leak. For example, if you replace file + * endpoint with mock endpoint, then everything goes fine, because + * MockEndpoint.createProducer() creates an implementation of + * AsyncProcessor. + */ + return "mock:" + counter.incrementAndGet(); + } + } +} \ No newline at end of file diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java new file mode 100644 index 0000000..1b56196 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class RoutingSlipNotStopErrorHandlerTest extends ContextTestSupport { + + private static final String DIRECT_START = "direct:start"; + private static final String THROWING_ROUTE = "direct:throwingRoute"; + + public static class CustomRoutingSlip { + + public String router() { + return THROWING_ROUTE; + } + } + + @Test + public void testRoutingSlipNotStopErrorHandler() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(2); + + template.sendBody(DIRECT_START, "ABC"); + + template.sendBody(THROWING_ROUTE, "123"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + errorHandler(deadLetterChannel("mock:result") + .maximumRedeliveries(1) + .redeliveryDelay(10) + .retriesExhaustedLogLevel(LoggingLevel.ERROR) + .retryAttemptedLogLevel(LoggingLevel.WARN) + .logStackTrace(true) + .logRetryStackTrace(true) + ); + + from(DIRECT_START).routingSlip(method(CustomRoutingSlip.class, "router")); + + from(THROWING_ROUTE).process().exchange(o -> { + throw new IllegalStateException(); + }); + } + }; + } +} -- To stop receiving notification emails like this one, please contact "commits@camel.apache.org" .