Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 01CC118568 for ; Sat, 26 Mar 2016 00:01:34 +0000 (UTC) Received: (qmail 14725 invoked by uid 500); 26 Mar 2016 00:01:33 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 14667 invoked by uid 500); 26 Mar 2016 00:01:33 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 14652 invoked by uid 99); 26 Mar 2016 00:01:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Mar 2016 00:01:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 90C7CE083B; Sat, 26 Mar 2016 00:01:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: raulk@apache.org To: commits@camel.apache.org Date: Sat, 26 Mar 2016 00:01:36 -0000 Message-Id: <477307bec253416e8d66b1b07762df32@git.apache.org> In-Reply-To: <68eea294dd1e4b1b830e16773525fa8e@git.apache.org> References: <68eea294dd1e4b1b830e16773525fa8e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] camel git commit: CAMEL-9765: Direct-VM - Header filter strategy & property propagation flag. CAMEL-9765: Direct-VM - Header filter strategy & property propagation flag. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5a60b6e0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5a60b6e0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5a60b6e0 Branch: refs/heads/jdk8-lambdas Commit: 5a60b6e0da927e27562c6afde23cf356eae9d264 Parents: 5aff922 Author: Raúl Kripalani Authored: Fri Mar 25 21:07:40 2016 +0000 Committer: Raúl Kripalani Committed: Sat Mar 26 00:00:47 2016 +0000 ---------------------------------------------------------------------- .../component/directvm/DirectVmComponent.java | 32 ++++++++ .../component/directvm/DirectVmConsumer.java | 1 - .../component/directvm/DirectVmEndpoint.java | 33 ++++++++- .../component/directvm/DirectVmProducer.java | 54 +++++++++----- .../DirectVmHeaderFilterStrategyTest.java | 77 ++++++++++++++++++++ ...ectVmNoPropertyPropagationComponentTest.java | 62 ++++++++++++++++ .../DirectVmNoPropertyPropagationTest.java | 61 ++++++++++++++++ 7 files changed, 299 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java index 3e48c7c..d5c341f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.Metadata; /** @@ -41,6 +42,9 @@ public class DirectVmComponent extends UriEndpointComponent { private boolean block; @Metadata(defaultValue = "30000") private long timeout = 30000L; + private HeaderFilterStrategy headerFilterStrategy; + @Metadata(defaultValue = "true") + private Boolean propagateProperties = Boolean.TRUE; public DirectVmComponent() { super(DirectVmEndpoint.class); @@ -65,6 +69,7 @@ public class DirectVmComponent extends UriEndpointComponent { answer.setBlock(block); answer.setTimeout(timeout); answer.configureProperties(parameters); + setProperties(answer, parameters); return answer; } @@ -132,4 +137,31 @@ public class DirectVmComponent extends UriEndpointComponent { public void setTimeout(long timeout) { this.timeout = timeout; } + + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + + /** + * Sets a {@link HeaderFilterStrategy} that will only be applied on producer endpoints (on both directions: request and response). + *

Default value: none.

+ * @param headerFilterStrategy + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } + + public boolean isPropagateProperties() { + return propagateProperties; + } + + /** + * Whether to propagate or not properties from the producer side to the consumer side, and viceversa. + *

Default value: true.

+ * @param propagateProperties + */ + public void setPropagateProperties(boolean propagateProperties) { + this.propagateProperties = propagateProperties; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java index 4b18fe3..827e975 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java @@ -18,7 +18,6 @@ package org.apache.camel.component.directvm; import org.apache.camel.Processor; import org.apache.camel.Suspendable; -import org.apache.camel.SuspendableService; import org.apache.camel.impl.DefaultConsumer; /** http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java index db5d1c3..eeb3bdf 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java @@ -21,6 +21,7 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.direct.DirectConsumer; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -43,6 +44,10 @@ public class DirectVmEndpoint extends DefaultEndpoint { private long timeout = 30000L; @UriParam(label = "producer") private boolean failIfNoConsumers = true; + @UriParam(label = "headerFilterStrategy") + private HeaderFilterStrategy headerFilterStrategy; + @UriParam(label = "propagateProperties", defaultValue = "false") + private Boolean propagateProperties; public DirectVmEndpoint(String endpointUri, DirectVmComponent component) { super(endpointUri, component); @@ -106,10 +111,36 @@ public class DirectVmEndpoint extends DefaultEndpoint { } /** - * Whether the producer should fail by throwing an exception, when sending to a DIRECT-VM endpoint with no active consumers. + * Whether the producer should fail by throwing an exception, when sending to a Direct-VM endpoint with no active consumers. */ public void setFailIfNoConsumers(boolean failIfNoConsumers) { this.failIfNoConsumers = failIfNoConsumers; } + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy == null ? getComponent().getHeaderFilterStrategy() : headerFilterStrategy; + } + + /** + * Sets a {@link HeaderFilterStrategy} that will only be applied on producer endpoints (on both directions: request and response). + *

Default value: none.

+ * @param headerFilterStrategy + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } + + public Boolean isPropagateProperties() { + return propagateProperties == null ? getComponent().isPropagateProperties() : propagateProperties; + } + + /** + * Whether to propagate or not properties from the producer side to the consumer side, and viceversa. + *

Default value: true.

+ * @param propagateProperties + */ + public void setPropagateProperties(Boolean propagateProperties) { + this.propagateProperties = propagateProperties; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java index 32fb395..d93d849 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java @@ -19,9 +19,10 @@ package org.apache.camel.component.directvm; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.spi.HeaderFilterStrategy; /** - * The direct-vm producer + * The Direct-VM producer. */ public class DirectVmProducer extends DefaultAsyncProducer { @@ -33,24 +34,10 @@ public class DirectVmProducer extends DefaultAsyncProducer { } @Override - public void process(Exchange exchange) throws Exception { - // send to consumer - DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint); - if (consumer == null) { - if (endpoint.isFailIfNoConsumers()) { - throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); - } else { - log.debug("message ignored, no consumers available on endpoint: " + endpoint); - } - } else { - consumer.getProcessor().process(exchange); - } - } - - @Override public boolean process(Exchange exchange, AsyncCallback callback) { // send to consumer DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint); + if (consumer == null) { if (endpoint.isFailIfNoConsumers()) { exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange)); @@ -59,8 +46,37 @@ public class DirectVmProducer extends DefaultAsyncProducer { } callback.done(true); return true; - } else { - return endpoint.getConsumer().getAsyncProcessor().process(exchange, callback); } + + final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy(); + + // Only clone the Exchange if we actually need to filter out properties or headers. + final Exchange submitted = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy(true) : exchange; + + // Clear properties in the copy if we are not propagating them. + if (!endpoint.isPropagateProperties()) { + submitted.getProperties().clear(); + } + + // Filter headers by Header Filter Strategy if there is one set. + if (headerFilterStrategy != null) { + submitted.getIn().getHeaders().entrySet().removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted)); + } + + return consumer.getAsyncProcessor().process(submitted, done -> { + exchange.setException(submitted.getException()); + exchange.getOut().copyFrom(submitted.hasOut() ? submitted.getOut() : submitted.getIn()); + + if (headerFilterStrategy != null) { + exchange.getOut().getHeaders().entrySet().removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), submitted)); + } + + if (endpoint.isPropagateProperties()) { + exchange.getProperties().putAll(submitted.getProperties()); + } + + callback.done(done); + }); } -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java new file mode 100644 index 0000000..40315d1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.directvm; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.spi.HeaderFilterStrategy; + +/** + * + */ +public class DirectVmHeaderFilterStrategyTest extends ContextTestSupport { + + public void testPropertiesPropagatedOrNot() throws Exception { + context.getRegistry(JndiRegistry.class).bind("headerFilterStrategy", new HeaderFilterStrategy() { + @Override + public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, Exchange exchange) { + return headerName.equals("Header2"); + } + + @Override + public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, Exchange exchange) { + return headerName.equals("Header1"); + } + }); + + Exchange response = template.request("direct-vm:start.filter?headerFilterStrategy=#headerFilterStrategy", exchange -> { + exchange.getIn().setBody("Hello World"); + exchange.getIn().setHeader("Header1", "Value1"); + }); + + assertNull(response.getException()); + assertNull(response.getOut().getHeader("Header2")); + + response = template.request("direct-vm:start.nofilter", exchange -> { + exchange.getIn().setBody("Hello World"); + exchange.getIn().setHeader("Header1", "Value1"); + }); + + assertNull(response.getException()); + assertEquals("Value2", response.getOut().getHeader("Header2", String.class)); + + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct-vm:start.filter").process(exchange -> { + assertNull(exchange.getIn().getHeader("Header1")); + exchange.getIn().setHeader("Header2", "Value2"); + }); + + from("direct-vm:start.nofilter").process(exchange -> { + assertEquals("Value1", exchange.getIn().getHeader("Header1")); + exchange.getIn().setHeader("Header2", "Value2"); + }); + } + }; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java new file mode 100644 index 0000000..23b2598 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.directvm; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class DirectVmNoPropertyPropagationComponentTest extends ContextTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + DirectVmComponent directvm = new DirectVmComponent(); + directvm.setPropagateProperties(false); + context.addComponent("direct-vm", directvm); + + return context; + } + + public void testPropertiesPropagatedOrNot() throws Exception { + + + template.sendBody("direct-vm:start.default", "Hello World"); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // Starters. + from("direct-vm:start.default") + .setProperty("abc", constant("def")) + .to("direct-vm:foo.noprops"); + + // Asserters. + from("direct-vm:foo.noprops").process(exchange -> + assertNull(exchange.getProperty("abc")) + ); + + } + }; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5a60b6e0/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java new file mode 100644 index 0000000..6a94bba --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.directvm; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class DirectVmNoPropertyPropagationTest extends ContextTestSupport { + + public void testPropertiesPropagatedOrNot() throws Exception { + template.sendBody("direct-vm:start.noprops", "Hello World"); + template.sendBody("direct-vm:start.props", "Hello World"); + template.sendBody("direct-vm:start.default", "Hello World"); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // Starters. + from("direct-vm:start.noprops") + .setProperty("abc", constant("def")) + .to("direct-vm:foo.noprops?propagateProperties=false"); + + from("direct-vm:start.props") + .setProperty("abc", constant("def")) + .to("direct-vm:foo.props?propagateProperties=true"); + + from("direct-vm:start.default") + .setProperty("abc", constant("def")) + .to("direct-vm:foo.props"); + + // Asserters. + from("direct-vm:foo.noprops").process(exchange -> + assertNull(exchange.getProperty("abc")) + ); + + from("direct-vm:foo.props").process(exchange -> + assertEquals("def", exchange.getProperty("abc", String.class)) + ); + } + }; + } + +} \ No newline at end of file