Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 591F4105D4 for ; Fri, 23 Aug 2013 05:44:39 +0000 (UTC) Received: (qmail 87050 invoked by uid 500); 23 Aug 2013 05:44:33 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 86881 invoked by uid 500); 23 Aug 2013 05:44:27 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 86864 invoked by uid 99); 23 Aug 2013 05:44:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Aug 2013 05:44:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E663B8C366B; Fri, 23 Aug 2013 05:44:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Fri, 23 Aug 2013 05:44:24 -0000 Message-Id: <72ed312beace43ae9a1dd6d501b1d921@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: CAMEL-6447: Fixed CBR not working correctly when being nested. Also fixed endChoice to work better when nested. Updated Branches: refs/heads/camel-2.11.x ba58c25cd -> 27a5f833b refs/heads/master 922ea959d -> 020c451a7 CAMEL-6447: Fixed CBR not working correctly when being nested. Also fixed endChoice to work better when nested. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/020c451a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/020c451a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/020c451a Branch: refs/heads/master Commit: 020c451a794e8b9d0ed35cbd3ffbcfae4c53485f Parents: 922ea95 Author: Claus Ibsen Authored: Thu Aug 22 21:40:41 2013 +0200 Committer: Claus Ibsen Committed: Thu Aug 22 22:31:07 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 7 +- .../apache/camel/processor/ChoiceProcessor.java | 123 +++++++------------ .../camel/processor/NestedChoiceIssueTest.java | 74 +++++++++++ .../processor/TripleNestedChoiceIssueTest.java | 93 ++++++++++++++ .../async/AsyncNestedTripleChoiceIssueTest.java | 97 +++++++++++++++ .../processor/SpringNestedChoiceIssueTest.java | 32 +++++ .../processor/SpringNestedChoiceIssueTest.xml | 48 ++++++++ 7 files changed, 394 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 45889c3..9998e51 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -1295,8 +1295,13 @@ public abstract class ProcessorDefinition * @return the builder */ public ChoiceDefinition endChoice() { - // are we already a choice? + // are we nested choice? ProcessorDefinition def = this; + if (def.getParent() instanceof WhenDefinition) { + return (ChoiceDefinition) def.getParent().getParent(); + } + + // are we already a choice? if (def instanceof ChoiceDefinition) { return (ChoiceDefinition) def; } http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java index 5af8b36..0310c9a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java @@ -30,10 +30,6 @@ import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ServiceHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.camel.processor.PipelineHelper.continueProcessing; /** * Implements a Choice structure where one or more predicates are used which if @@ -43,7 +39,6 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * @version */ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate, Traceable { - private static final Logger LOG = LoggerFactory.getLogger(ChoiceProcessor.class); private final List filters; private final Processor otherwise; @@ -56,91 +51,61 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N AsyncProcessorHelper.process(this, exchange); } - public boolean process(Exchange exchange, AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { Iterator processors = next().iterator(); - exchange.setProperty(Exchange.FILTER_MATCHED, false); - while (continueRouting(processors, exchange)) { + // callback to restore existing FILTER_MATCHED property on the Exchange + final Object existing = exchange.getProperty(Exchange.FILTER_MATCHED); + final AsyncCallback choiceCallback = new AsyncCallback() { + @Override + public void done(boolean doneSync) { + if (existing != null) { + exchange.setProperty(Exchange.FILTER_MATCHED, existing); + } else { + exchange.removeProperty(Exchange.FILTER_MATCHED); + } + callback.done(doneSync); + } + }; + + // as we only pick one processor to process, then no need to have async callback that has a while loop as well + // as this should not happen, eg we pick the first filter processor that matches, or the otherwise (if present) + // and if not, we just continue without using any processor + while (processors.hasNext()) { // get the next processor Processor processor = processors.next(); - AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); - boolean sync = process(exchange, callback, processors, async); - - // continue as long its being processed synchronously - if (!sync) { - LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); - // the remainder of the CBR will be completed async - // so we break out now, then the callback will be invoked which then continue routing from where we left here - return false; + // evaluate the predicate on filter predicate early to be faster + // and avoid issues when having nested choices + // as we should only pick one processor + boolean matches = true; + if (processor instanceof FilterProcessor) { + FilterProcessor filter = (FilterProcessor) processor; + try { + matches = filter.getPredicate().matches(exchange); + exchange.setProperty(Exchange.FILTER_MATCHED, matches); + } catch (Throwable e) { + exchange.setException(e); + choiceCallback.done(true); + return true; + } + // as we have pre evaluated the predicate then use its processor directly when routing + processor = filter.getProcessor(); } - LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); - - // check for error if so we should break out - if (!continueProcessing(exchange, "so breaking out of content based router", LOG)) { - break; + // if we did not match then continue to next filter + if (!matches) { + continue; } - } - - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - callback.done(true); - return true; - } - - protected boolean continueRouting(Iterator it, Exchange exchange) { - boolean answer = it.hasNext(); - if (answer) { - Object matched = exchange.getProperty(Exchange.FILTER_MATCHED); - if (matched != null) { - boolean hasMatched = exchange.getContext().getTypeConverter().convertTo(Boolean.class, matched); - if (hasMatched) { - LOG.debug("ExchangeId: {} has been matched: {}", exchange.getExchangeId(), exchange); - answer = false; - } - } + // okay we found a filter or its the otherwise we are processing + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); + return async.process(exchange, choiceCallback); } - LOG.trace("ExchangeId: {} should continue matching: {}", exchange.getExchangeId(), answer); - return answer; - } - - private boolean process(final Exchange exchange, final AsyncCallback callback, - final Iterator processors, final AsyncProcessor asyncProcessor) { - // this does the actual processing so log at trace level - LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - - // implement asynchronous routing logic in callback so we can have the callback being - // triggered and then continue routing where we left - boolean sync = asyncProcessor.process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // we only have to handle async completion of the pipeline - if (doneSync) { - return; - } - // continue processing the pipeline asynchronously - while (continueRouting(processors, exchange)) { - AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next()); - - // check for error if so we should break out - if (!continueProcessing(exchange, "so breaking out of pipeline", LOG)) { - break; - } - - doneSync = process(exchange, callback, processors, processor); - if (!doneSync) { - LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); - return; - } - } - - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - callback.done(false); - } - }); - - return sync; + // when no filter matches and there is no otherwise, then just continue + choiceCallback.done(true); + return true; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/camel-core/src/test/java/org/apache/camel/processor/NestedChoiceIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/NestedChoiceIssueTest.java b/camel-core/src/test/java/org/apache/camel/processor/NestedChoiceIssueTest.java new file mode 100644 index 0000000..bb74f3c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/NestedChoiceIssueTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class NestedChoiceIssueTest extends ContextTestSupport { + + public void testNestedChoiceBig() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 10); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceMed() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(1); + getMockEndpoint("mock:big").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 4); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceLow() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(1); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 1); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .choice() + .when(header("foo").isGreaterThan(1)) + .choice() + .when(header("foo").isGreaterThan(5)) + .to("mock:big") + .otherwise() + .to("mock:med") + .endChoice() + .otherwise() + .to("mock:low") + .end(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/camel-core/src/test/java/org/apache/camel/processor/TripleNestedChoiceIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/TripleNestedChoiceIssueTest.java b/camel-core/src/test/java/org/apache/camel/processor/TripleNestedChoiceIssueTest.java new file mode 100644 index 0000000..713deda --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/TripleNestedChoiceIssueTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class TripleNestedChoiceIssueTest extends ContextTestSupport { + + public void testNestedChoiceVeryBig() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(0); + getMockEndpoint("mock:verybig").expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 20); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceBig() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(1); + getMockEndpoint("mock:verybig").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 10); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceMed() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(1); + getMockEndpoint("mock:big").expectedMessageCount(0); + getMockEndpoint("mock:verybig").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 4); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceLow() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(1); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(0); + getMockEndpoint("mock:verybig").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 1); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .choice() + .when(header("foo").isGreaterThan(1)) + .choice() + .when(header("foo").isGreaterThan(5)) + .choice() + .when(header("foo").isGreaterThan(10)) + .to("mock:verybig") + .otherwise() + .to("mock:big") + .endChoice() + .otherwise() + .to("mock:med") + .endChoice() + .otherwise() + .to("mock:low") + .end(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNestedTripleChoiceIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNestedTripleChoiceIssueTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNestedTripleChoiceIssueTest.java new file mode 100644 index 0000000..2cc0561 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNestedTripleChoiceIssueTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class AsyncNestedTripleChoiceIssueTest extends ContextTestSupport { + + public void testNestedChoiceVeryBig() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(0); + getMockEndpoint("mock:verybig").expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 10); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceBig() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(1); + getMockEndpoint("mock:verybig").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 7); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceMed() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(0); + getMockEndpoint("mock:med").expectedMessageCount(1); + getMockEndpoint("mock:big").expectedMessageCount(0); + getMockEndpoint("mock:verybig").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 4); + + assertMockEndpointsSatisfied(); + } + + public void testNestedChoiceLow() throws Exception { + getMockEndpoint("mock:low").expectedMessageCount(1); + getMockEndpoint("mock:med").expectedMessageCount(0); + getMockEndpoint("mock:big").expectedMessageCount(0); + getMockEndpoint("mock:verybig").expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", 1); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.addComponent("async", new MyAsyncComponent()); + + from("direct:start") + .choice() + .when(header("foo").isGreaterThan(1)) + .to("async:bye:camel") + .choice() + .when(header("foo").isGreaterThan(5)) + .to("async:bye:camel2") + .choice() + .when(header("foo").isGreaterThan(7)) + .to("mock:verybig") + .otherwise() + .to("mock:big") + .endChoice() + .otherwise() + .to("mock:med") + .endChoice() + .otherwise() + .to("mock:low") + .end(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.java new file mode 100644 index 0000000..2a471ff --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.NestedChoiceIssueTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +/** + * @version + */ +public class SpringNestedChoiceIssueTest extends NestedChoiceIssueTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.xml"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/020c451a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.xml new file mode 100644 index 0000000..ea9c7a3 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringNestedChoiceIssueTest.xml @@ -0,0 +1,48 @@ + + + + + + + + + + ${header.foo} > 1 + + + ${header.foo} > 5 + + + + + + + + + + + + + + +