Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D062D18E6F for ; Wed, 6 May 2015 12:53:12 +0000 (UTC) Received: (qmail 56215 invoked by uid 500); 6 May 2015 12:53:12 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 56170 invoked by uid 500); 6 May 2015 12:53:12 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 56161 invoked by uid 99); 6 May 2015 12:53:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2015 12:53:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 777D8DFF84; Wed, 6 May 2015 12:53:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Message-Id: <3a531a4de4324862b5d83617f4403a00@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: camel git commit: CAMEL-8522: camel-jms - now has SPI that allows components to hook into the JMS Message creation process to do any component specific enrichments on the created JMS message. Date: Wed, 6 May 2015 12:53:12 +0000 (UTC) Repository: camel Updated Branches: refs/heads/master 0bf8954cd -> ac06cc9dd CAMEL-8522: camel-jms - now has SPI that allows components to hook into the JMS Message creation process to do any component specific enrichments on the created JMS message. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ac06cc9d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ac06cc9d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ac06cc9d Branch: refs/heads/master Commit: ac06cc9ddcffdb45e1152f65d6af02aa52386eb3 Parents: 0bf8954 Author: Claus Ibsen Authored: Wed May 6 13:54:01 2015 +0200 Committer: Claus Ibsen Committed: Wed May 6 14:56:15 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/component/jms/JmsBinding.java | 32 +++++-- .../camel/component/jms/JmsComponent.java | 13 +++ .../camel/component/jms/JmsConfiguration.java | 14 +++ .../apache/camel/component/jms/JmsEndpoint.java | 8 ++ .../component/jms/MessageCreatedStrategy.java | 39 +++++++++ .../JmsMessageCreatedStrategyComponentTest.java | 85 ++++++++++++++++++ .../JmsMessageCreatedStrategyEndpointTest.java | 91 ++++++++++++++++++++ 7 files changed, 274 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java index 84b20a2..b06dc35 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; @@ -75,24 +74,34 @@ public class JmsBinding { private final JmsEndpoint endpoint; private final HeaderFilterStrategy headerFilterStrategy; private final JmsKeyFormatStrategy jmsKeyFormatStrategy; + private final MessageCreatedStrategy messageCreatedStrategy; public JmsBinding() { this.endpoint = null; - headerFilterStrategy = new JmsHeaderFilterStrategy(false); - jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + this.headerFilterStrategy = new JmsHeaderFilterStrategy(false); + this.jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + this.messageCreatedStrategy = null; } public JmsBinding(JmsEndpoint endpoint) { this.endpoint = endpoint; if (endpoint.getHeaderFilterStrategy() != null) { - headerFilterStrategy = endpoint.getHeaderFilterStrategy(); + this.headerFilterStrategy = endpoint.getHeaderFilterStrategy(); } else { - headerFilterStrategy = new JmsHeaderFilterStrategy(endpoint.isIncludeAllJMSXProperties()); + this.headerFilterStrategy = new JmsHeaderFilterStrategy(endpoint.isIncludeAllJMSXProperties()); } if (endpoint.getJmsKeyFormatStrategy() != null) { - jmsKeyFormatStrategy = endpoint.getJmsKeyFormatStrategy(); + this.jmsKeyFormatStrategy = endpoint.getJmsKeyFormatStrategy(); + } else { + this.jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + } + if (endpoint.getMessageCreatedStrategy() != null) { + this.messageCreatedStrategy = endpoint.getMessageCreatedStrategy(); + } else if (endpoint.getComponent() != null) { + // fallback and use from component + this.messageCreatedStrategy = endpoint.getComponent().getMessageCreatedStrategy(); } else { - jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy(); + this.messageCreatedStrategy = null; } } @@ -234,7 +243,11 @@ public class JmsBinding { * @throws JMSException if the message could not be created */ public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException { - return makeJmsMessage(exchange, exchange.getIn(), session, null); + Message answer = makeJmsMessage(exchange, exchange.getIn(), session, null); + if (answer != null && messageCreatedStrategy != null) { + messageCreatedStrategy.onMessageCreated(answer, session, exchange, null); + } + return answer; } /** @@ -291,6 +304,9 @@ public class JmsBinding { } } + if (answer != null && messageCreatedStrategy != null) { + messageCreatedStrategy.onMessageCreated(answer, session, exchange, null); + } return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index 070c44d..3baeba9 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -55,6 +55,7 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon private QueueBrowseStrategy queueBrowseStrategy; private HeaderFilterStrategy headerFilterStrategy; private ExecutorService asyncStartStopExecutorService; + private MessageCreatedStrategy messageCreatedStrategy; public JmsComponent() { super(JmsEndpoint.class); @@ -447,6 +448,18 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon this.headerFilterStrategy = strategy; } + public MessageCreatedStrategy getMessageCreatedStrategy() { + return messageCreatedStrategy; + } + + /** + * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message + * objects when Camel is sending a JMS message. + */ + public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { + this.messageCreatedStrategy = messageCreatedStrategy; + } + // Implementation methods // ------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index e372293..7d8e381 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -206,6 +206,8 @@ public class JmsConfiguration implements Cloneable { private DefaultTaskExecutorType defaultTaskExecutorType; @UriParam private boolean includeAllJMSXProperties; + @UriParam + private MessageCreatedStrategy messageCreatedStrategy; public JmsConfiguration() { } @@ -1457,4 +1459,16 @@ public class JmsConfiguration implements Cloneable { public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) { this.includeAllJMSXProperties = includeAllJMSXProperties; } + + public MessageCreatedStrategy getMessageCreatedStrategy() { + return messageCreatedStrategy; + } + + /** + * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message + * objects when Camel is sending a JMS message. + */ + public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { + this.messageCreatedStrategy = messageCreatedStrategy; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index 6389226..e50b1be 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -1066,6 +1066,14 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy getConfiguration().setJmsKeyFormatStrategy(jmsHeaderStrategy); } + public MessageCreatedStrategy getMessageCreatedStrategy() { + return getConfiguration().getMessageCreatedStrategy(); + } + + public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) { + getConfiguration().setMessageCreatedStrategy(messageCreatedStrategy); + } + @ManagedAttribute public boolean isTransferExchange() { return getConfiguration().isTransferExchange(); http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageCreatedStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageCreatedStrategy.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageCreatedStrategy.java new file mode 100644 index 0000000..1db60f9 --- /dev/null +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageCreatedStrategy.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.camel.Exchange; + +/** + * A strategy that allows custom components to plugin and perform custom logic when Camel creates {@link javax.jms.Message} instance. + *

+ * For example to populate the message with custom information that are component specific and not part of the JMS specification. + */ +public interface MessageCreatedStrategy { + + /** + * Callback when the JMS message has just been created, which allows custom modifications afterwards. + * + * @param exchange the current exchange + * @param session the JMS session used to create the message + * @param cause optional exception occurred that should be sent as reply instead of a regular body + */ + void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause); +} http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyComponentTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyComponentTest.java new file mode 100644 index 0000000..1615561 --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyComponentTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * @version + */ +public class JmsMessageCreatedStrategyComponentTest extends CamelTestSupport { + + protected String componentName = "activemq"; + + @Test + public void testMessageCreatedStrategy() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("beer", "Carlsberg"); + + template.sendBody("activemq:queue:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent(componentName, jmsComponentAutoAcknowledge(connectionFactory)); + + JmsComponent jms = camelContext.getComponent(componentName, JmsComponent.class); + jms.setMessageCreatedStrategy(new MyMessageCreatedStrategy()); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("activemq:queue:foo") + .to("mock:result"); + } + }; + } + + private class MyMessageCreatedStrategy implements MessageCreatedStrategy { + + @Override + public void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause) { + try { + JmsMessageHelper.setProperty(message, "beer", "Carlsberg"); + } catch (JMSException e) { + // ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ac06cc9d/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyEndpointTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyEndpointTest.java new file mode 100644 index 0000000..708ebfb --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageCreatedStrategyEndpointTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * @version + */ +public class JmsMessageCreatedStrategyEndpointTest extends CamelTestSupport { + + protected String componentName = "activemq"; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myStrategy", new MyMessageCreatedStrategy()); + return jndi; + } + + @Test + public void testMessageCreatedStrategy() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("beer", "Carlsberg"); + + // must remember to use this on the producer side as its in use when sending + template.sendBody("activemq:queue:foo?messageCreatedStrategy=#myStrategy", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent(componentName, jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("activemq:queue:foo") + .to("mock:result"); + } + }; + } + + private class MyMessageCreatedStrategy implements MessageCreatedStrategy { + + @Override + public void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause) { + try { + JmsMessageHelper.setProperty(message, "beer", "Carlsberg"); + } catch (JMSException e) { + // ignore + } + } + } +}