Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 559FF200AF8 for ; Thu, 5 May 2016 20:14:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 54577160A03; Thu, 5 May 2016 18:14:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 296081609F3 for ; Thu, 5 May 2016 20:14:33 +0200 (CEST) Received: (qmail 79264 invoked by uid 500); 5 May 2016 18:14:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 79255 invoked by uid 99); 5 May 2016 18:14:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 May 2016 18:14:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 41395DFE61; Thu, 5 May 2016 18:14:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Date: Thu, 05 May 2016 18:14:32 -0000 Message-Id: <18161ffbaa5f46c49ee91ca276962aff@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should be the target queue archived-at: Thu, 05 May 2016 18:14:34 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.13.x 0e78877f6 -> 6abf89f0a https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should be the target queue (cherry picked from commit 4e63ee7cc7c4ed7d1fb8ae916c0984b974c175c0) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c276e2e6 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c276e2e6 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c276e2e6 Branch: refs/heads/activemq-5.13.x Commit: c276e2e652d2964cdc69ebf9b72663f0d985031d Parents: 0e78877 Author: Dejan Bosanac Authored: Mon Dec 21 15:19:01 2015 +0100 Committer: Timothy Bish Committed: Thu May 5 14:14:11 2016 -0400 ---------------------------------------------------------------------- .../region/virtual/VirtualTopicInterceptor.java | 11 +- .../MessageDestinationVirtualTopicTest.java | 120 +++++++++++++++++++ .../broker/virtual/SimpleMessageListener.java | 79 ++++++++++++ .../broker/virtual/VirtualTopicDLQTest.java | 2 +- .../virtual/virtual-topic-network-test.xml | 100 ++++++++++++++++ 5 files changed, 309 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index f673770..65d3efc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -91,7 +91,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { public void run() { try { if (exceptionAtomicReference.get() == null) { - dest.send(context, message.copy()); + dest.send(context, copy(message, dest.getActiveMQDestination())); } } catch (Exception e) { exceptionAtomicReference.set(e); @@ -112,7 +112,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { } else { for (final Destination dest : destinations) { if (shouldDispatch(broker, message, dest)) { - dest.send(context, message.copy()); + dest.send(context, copy(message, dest.getActiveMQDestination())); } } } @@ -121,6 +121,13 @@ public class VirtualTopicInterceptor extends DestinationFilter { } } + private Message copy(Message original, ActiveMQDestination target) { + Message msg = original.copy(); + msg.setDestination(target); + msg.setOriginalDestination(original.getDestination()); + return msg; + } + private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception { LocalTransactionId result = null; if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java new file mode 100644 index 0000000..f370efc --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import javax.annotation.Resource; +import javax.jms.*; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration({ "virtual-topic-network-test.xml" }) +public class MessageDestinationVirtualTopicTest { + + private static final Logger LOG = LoggerFactory.getLogger(MessageDestinationVirtualTopicTest.class); + + private SimpleMessageListener listener1; + + private SimpleMessageListener listener2; + + @Resource(name = "broker1") + private BrokerService broker1; + + @Resource(name = "broker2") + private BrokerService broker2; + + private MessageProducer producer; + + private Session session1; + + public void init() throws JMSException { + // Create connection on Broker B2 + ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616"); + Connection connection2 = broker2ConnectionFactory.createConnection(); + connection2.start(); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1"); + + // Bind listener on queue for consumer D + MessageConsumer consumer = session2.createConsumer(consumerDQueue); + listener2 = new SimpleMessageListener(); + consumer.setMessageListener(listener2); + + // Create connection on Broker B1 + ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection connection1 = broker1ConnectionFactory.createConnection(); + connection1.start(); + session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1"); + + // Bind listener on queue for consumer D + MessageConsumer consumer1 = session1.createConsumer(consumerCQueue); + listener1 = new SimpleMessageListener(); + consumer1.setMessageListener(listener1); + + // Create producer for topic, on B1 + Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1"); + producer = session1.createProducer(virtualTopicT1); + } + + @Test + public void testDestinationNames() throws Exception { + + LOG.info("Started waiting for broker 1 and 2"); + broker1.waitUntilStarted(); + broker2.waitUntilStarted(); + LOG.info("Broker 1 and 2 have started"); + + init(); + + // Create a monitor + CountDownLatch monitor = new CountDownLatch(2); + listener1.setCountDown(monitor); + listener2.setCountDown(monitor); + + LOG.info("Sending message"); + // Send a message on the topic + TextMessage message = session1.createTextMessage("Hello World !"); + producer.send(message); + LOG.info("Waiting for message reception"); + // Wait the two messages in the related queues + monitor.await(); + + // Get the message destinations + String lastJMSDestination2 = listener2.getLastJMSDestination(); + System.err.println(lastJMSDestination2); + String lastJMSDestination1 = listener1.getLastJMSDestination(); + System.err.println(lastJMSDestination1); + + // The destination names + assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2); + assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java new file mode 100644 index 0000000..166bfb5 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleMessageListener implements MessageListener { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageListener.class); + + private CountDownLatch messageReceivedToken; + + private String lastJMSDestination; + + @Override + public void onMessage(Message message) { + try { + Thread.sleep(2000L); + if (message instanceof TextMessage) { + LOG.info("Dest:" + message.getJMSDestination()); + lastJMSDestination = message.getJMSDestination().toString(); + + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + Object object = propertyNames.nextElement(); + } + + } + messageReceivedToken.countDown(); + + } + catch (JMSException e) { + LOG.error("Error while listening to a message", message); + } + catch (InterruptedException e) { + LOG.error("Interrupted while listening to a message", message); + } + } + + /** + * @param countDown + * the countDown to set + */ + public void setCountDown(CountDownLatch countDown) { + this.messageReceivedToken = countDown; + } + + /** + * @return the lastJMSDestination + */ + public String getLastJMSDestination() { + return lastJMSDestination; + } + +} + http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java index 7c853cf..11e2d7f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java @@ -69,7 +69,7 @@ public class VirtualTopicDLQTest extends TestCase { // Expected Individual Dead Letter Queue names that are tied to the // Subscriber Queues - private static final String dlqPrefix = "ActiveMQ.DLQ.Topic."; + private static final String dlqPrefix = "ActiveMQ.DLQ.Queue."; // Number of messages private static final int numberMessages = 6; http://git-wip-us.apache.org/repos/asf/activemq/blob/c276e2e6/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml new file mode 100644 index 0000000..0c2b1ec --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml @@ -0,0 +1,100 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +