Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BF2D8183EF for ; Thu, 30 Jul 2015 00:28:51 +0000 (UTC) Received: (qmail 40973 invoked by uid 500); 30 Jul 2015 00:28:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 40921 invoked by uid 500); 30 Jul 2015 00:28:51 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 40908 invoked by uid 99); 30 Jul 2015 00:28:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jul 2015 00:28:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8215ADFFED; Thu, 30 Jul 2015 00:28:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgenender@apache.org To: commits@activemq.apache.org Date: Thu, 30 Jul 2015 00:28:51 -0000 Message-Id: <3a9efb853c8a48239b37a95cc061b7e6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq git commit: AMQ-5903 - add patch that fixes the broker camel component to take all header values Repository: activemq Updated Branches: refs/heads/master 0b8639778 -> de86f473f AMQ-5903 - add patch that fixes the broker camel component to take all header values Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8f407a78 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8f407a78 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8f407a78 Branch: refs/heads/master Commit: 8f407a78dc5ed868c1b59ccb68f489c8ceae875f Parents: e8f8155 Author: Heath Kesler Authored: Wed Jul 29 17:48:16 2015 -0600 Committer: Heath Kesler Committed: Wed Jul 29 17:48:16 2015 -0600 ---------------------------------------------------------------------- .../camel/component/broker/BrokerProducer.java | 89 +++++++------------- .../broker/BrokerComponentXMLConfigTest.java | 55 ++++++++---- .../activemq/camel/component/broker/camel.xml | 45 +++++----- 3 files changed, 94 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8f407a78/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java index fcf1256..82adad4 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java @@ -16,17 +16,17 @@ */ package org.apache.activemq.camel.component.broker; -import java.util.Map; - import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQMessage; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.jms.JmsMessage; -import org.apache.camel.converter.ObjectConverter; import org.apache.camel.impl.DefaultAsyncProducer; +import javax.jms.JMSException; +import java.util.Map; + public class BrokerProducer extends DefaultAsyncProducer { private final BrokerEndpoint brokerEndpoint; @@ -53,6 +53,7 @@ public class BrokerProducer extends DefaultAsyncProducer { protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) { try { ActiveMQMessage message = getMessage(exchange); + if (message != null) { message.setDestination(brokerEndpoint.getDestination()); //if the ProducerBrokerExchange is null the broker will create it @@ -67,76 +68,48 @@ public class BrokerProducer extends DefaultAsyncProducer { return true; } - private ActiveMQMessage getMessage(Exchange exchange) throws Exception { - ActiveMQMessage result; - Message camelMessage; + private ActiveMQMessage getMessage(Exchange exchange) throws IllegalStateException, JMSException { + Message camelMessage = getMessageFromExchange(exchange); + checkOriginalMessage(camelMessage); + ActiveMQMessage result = (ActiveMQMessage) ((JmsMessage) camelMessage).getJmsMessage(); + applyNewHeaders(result, camelMessage.getHeaders()); + return result; + } + + private Message getMessageFromExchange(Exchange exchange) { if (exchange.hasOut()) { - camelMessage = exchange.getOut(); - } else { - camelMessage = exchange.getIn(); + return exchange.getOut(); } - Map headers = camelMessage.getHeaders(); + return exchange.getIn(); + } + private void checkOriginalMessage(Message camelMessage) throws IllegalStateException { /** * We purposely don't want to support injecting messages half-way through * broker processing - use the activemq camel component for that - but - * we will support changing message headers and destinations + * we will support changing message headers and destinations. */ - if (camelMessage instanceof JmsMessage) { - JmsMessage jmsMessage = (JmsMessage) camelMessage; - if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) { - result = (ActiveMQMessage) jmsMessage.getJmsMessage(); - //lets apply any new message headers - setJmsHeaders(result, headers); - } else { - throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage()); - } - } else { + + if (!(camelMessage instanceof JmsMessage)) { throw new IllegalStateException("Not the original message from the broker " + camelMessage); } - return result; + javax.jms.Message message = ((JmsMessage) camelMessage).getJmsMessage(); + + if (!(message instanceof ActiveMQMessage)) { + throw new IllegalStateException("Not the original message from the broker " + message); + } } - private void setJmsHeaders(ActiveMQMessage message, Map headers) { - message.setReadOnlyProperties(false); + private void applyNewHeaders(ActiveMQMessage message, Map headers) throws JMSException { for (Map.Entry entry : headers.entrySet()) { - if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) { - Object value = entry.getValue(); - if (value instanceof Number) { - Number number = (Number) value; - message.setJMSDeliveryMode(number.intValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JmsPriority")) { - Integer value = ObjectConverter.toInteger(entry.getValue()); - if (value != null) { - message.setJMSPriority(value.intValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) { - Long value = ObjectConverter.toLong(entry.getValue()); - if (value != null) { - message.setJMSTimestamp(value.longValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSExpiration")) { - Long value = ObjectConverter.toLong(entry.getValue()); - if (value != null) { - message.setJMSExpiration(value.longValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) { - message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue())); - } - if (entry.getKey().equalsIgnoreCase("JMSType")) { - Object value = entry.getValue(); - if (value != null) { - message.setJMSType(value.toString()); - } + String key = entry.getKey(); + Object value = entry.getValue(); + if(value == null) { + continue; } + message.setObjectProperty(key, value.toString(), false); } } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/8f407a78/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java index c2fc3f6..2773baa 100644 --- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java @@ -16,22 +16,10 @@ */ package org.apache.activemq.camel.component.broker; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.xbean.BrokerFactoryBean; import org.junit.After; @@ -41,6 +29,14 @@ import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; +import javax.jms.*; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class BrokerComponentXMLConfigTest { protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/"; @@ -70,7 +66,6 @@ public class BrokerComponentXMLConfigTest { producerConnection = factory.createConnection(); producerConnection.start(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } @@ -133,7 +128,6 @@ public class BrokerComponentXMLConfigTest { latch.await(timeOutInSeconds, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); - } @Test @@ -179,4 +173,35 @@ public class BrokerComponentXMLConfigTest { assertEquals(0, divertLatch.getCount()); } + @Test + public void testPreserveOriginalHeaders() throws Exception { + final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + + Topic topic = consumerSession.createTopic(TOPIC_NAME); + + final CountDownLatch latch = new CountDownLatch(messageCount); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + assertEquals("321", message.getStringProperty("JMSXGroupID")); + assertEquals("custom", message.getStringProperty("CustomHeader")); + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + MessageProducer producer = producerSession.createProducer(topic); + + for (int i = 0; i < messageCount; i++) { + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + message.setStringProperty("JMSXGroupID", "123"); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/8f407a78/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml index 750c134..b84350b 100644 --- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml @@ -15,45 +15,46 @@ limitations under the License. --> - + - - 9 + + 321 + + + custom + - - - - - #{@destinationView.enqueueCount >= 100} - - - - - - + + + + + #{@destinationView.enqueueCount >= 100} + + + + + + - - + + -