activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should be the target queue
Date Mon, 21 Dec 2015 14:19:42 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 370b18109 -> 4e63ee7cc


https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should
be the target queue


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4e63ee7c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4e63ee7c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4e63ee7c

Branch: refs/heads/master
Commit: 4e63ee7cc7c4ed7d1fb8ae916c0984b974c175c0
Parents: 370b181
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Mon Dec 21 15:19:01 2015 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Mon Dec 21 15:19:22 2015 +0100

----------------------------------------------------------------------
 .../region/virtual/VirtualTopicInterceptor.java |  11 +-
 .../MessageDestinationVirtualTopicTest.java     | 120 +++++++++++++++++++
 .../broker/virtual/SimpleMessageListener.java   |  79 ++++++++++++
 .../broker/virtual/VirtualTopicDLQTest.java     |   2 +-
 .../virtual/virtual-topic-network-test.xml      | 100 ++++++++++++++++
 5 files changed, 309 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4e63ee7c/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index f673770..65d3efc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -91,7 +91,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
                             public void run() {
                                 try {
                                     if (exceptionAtomicReference.get() == null) {
-                                        dest.send(context, message.copy());
+                                        dest.send(context, copy(message, dest.getActiveMQDestination()));
                                     }
                                 } catch (Exception e) {
                                     exceptionAtomicReference.set(e);
@@ -112,7 +112,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
             } else {
                 for (final Destination dest : destinations) {
                     if (shouldDispatch(broker, message, dest)) {
-                        dest.send(context, message.copy());
+                        dest.send(context, copy(message, dest.getActiveMQDestination()));
                     }
                 }
             }
@@ -121,6 +121,13 @@ public class VirtualTopicInterceptor extends DestinationFilter {
         }
     }
 
+    private Message copy(Message original, ActiveMQDestination target) {
+        Message msg = original.copy();
+        msg.setDestination(target);
+        msg.setOriginalDestination(original.getDestination());
+        return msg;
+    }
+
     private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext
connectionContext, Message message) throws Exception {
         LocalTransactionId result = null;
         if (transactedSend && numDestinations > 1 && message.isPersistent()
&& message.getTransactionId() == null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4e63ee7c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
new file mode 100644
index 0000000..f370efc
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import javax.annotation.Resource;
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration({ "virtual-topic-network-test.xml" })
+public class MessageDestinationVirtualTopicTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageDestinationVirtualTopicTest.class);
+
+    private SimpleMessageListener listener1;
+
+    private SimpleMessageListener listener2;
+
+    @Resource(name = "broker1")
+    private BrokerService broker1;
+
+    @Resource(name = "broker2")
+    private BrokerService broker2;
+
+    private MessageProducer producer;
+
+    private Session session1;
+
+    public void init() throws JMSException {
+        // Create connection on Broker B2
+        ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616");
+        Connection connection2 = broker2ConnectionFactory.createConnection();
+        connection2.start();
+        Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");
+
+        // Bind listener on queue for consumer D
+        MessageConsumer consumer = session2.createConsumer(consumerDQueue);
+        listener2 = new SimpleMessageListener();
+        consumer.setMessageListener(listener2);
+
+        // Create connection on Broker B1
+        ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        Connection connection1 = broker1ConnectionFactory.createConnection();
+        connection1.start();
+        session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");
+
+        // Bind listener on queue for consumer D
+        MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
+        listener1 = new SimpleMessageListener();
+        consumer1.setMessageListener(listener1);
+
+        // Create producer for topic, on B1
+        Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1");
+        producer = session1.createProducer(virtualTopicT1);
+    }
+
+    @Test
+    public void testDestinationNames() throws Exception {
+
+        LOG.info("Started waiting for broker 1 and 2");
+        broker1.waitUntilStarted();
+        broker2.waitUntilStarted();
+        LOG.info("Broker 1 and 2 have started");
+
+        init();
+
+        // Create a monitor
+        CountDownLatch monitor = new CountDownLatch(2);
+        listener1.setCountDown(monitor);
+        listener2.setCountDown(monitor);
+
+        LOG.info("Sending message");
+        // Send a message on the topic
+        TextMessage message = session1.createTextMessage("Hello World !");
+        producer.send(message);
+        LOG.info("Waiting for message reception");
+        // Wait the two messages in the related queues
+        monitor.await();
+
+        // Get the message destinations
+        String lastJMSDestination2 = listener2.getLastJMSDestination();
+        System.err.println(lastJMSDestination2);
+        String lastJMSDestination1 = listener1.getLastJMSDestination();
+        System.err.println(lastJMSDestination1);
+
+        // The destination names
+        assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2);
+        assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1);
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/4e63ee7c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
new file mode 100644
index 0000000..166bfb5
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleMessageListener implements MessageListener {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageListener.class);
+
+  private CountDownLatch messageReceivedToken;
+
+  private String lastJMSDestination;
+
+  @Override
+  public void onMessage(Message message) {
+    try {
+      Thread.sleep(2000L);
+      if (message instanceof TextMessage) {
+        LOG.info("Dest:" + message.getJMSDestination());
+        lastJMSDestination = message.getJMSDestination().toString();
+
+        Enumeration propertyNames = message.getPropertyNames();
+        while (propertyNames.hasMoreElements()) {
+          Object object = propertyNames.nextElement();
+        }
+
+      }
+      messageReceivedToken.countDown();
+
+    }
+    catch (JMSException e) {
+      LOG.error("Error while listening to a message", message);
+    }
+    catch (InterruptedException e) {
+      LOG.error("Interrupted while listening to a message", message);
+    }
+  }
+
+  /**
+   * @param countDown
+   *          the countDown to set
+   */
+  public void setCountDown(CountDownLatch countDown) {
+    this.messageReceivedToken = countDown;
+  }
+
+  /**
+   * @return the lastJMSDestination
+   */
+  public String getLastJMSDestination() {
+    return lastJMSDestination;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq/blob/4e63ee7c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
index 7c853cf..11e2d7f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
@@ -69,7 +69,7 @@ public class VirtualTopicDLQTest extends TestCase {
 
     // Expected Individual Dead Letter Queue names that are tied to the
     // Subscriber Queues
-    private static final String dlqPrefix = "ActiveMQ.DLQ.Topic.";
+    private static final String dlqPrefix = "ActiveMQ.DLQ.Queue.";
 
     // Number of messages
     private static final int numberMessages = 6;

http://git-wip-us.apache.org/repos/asf/activemq/blob/4e63ee7c/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
new file mode 100644
index 0000000..0c2b1ec
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml
@@ -0,0 +1,100 @@
+<!-- START SNIPPET: xbean -->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:jms="http://www.springframework.org/schema/jms"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+	http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
+	http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.11.0.xsd">
+
+	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+	<!-- Broker 1 definition -->
+	<amq:broker xmlns="http://activemq.apache.org/schema/core" id="broker1" brokerName="B1"
useJmx="false" useShutdownHook="false" useVirtualTopics="true" persistent="false" start="true"
startAsync="true">
+
+		<!-- Transport protocol -->
+		<amq:transportConnectors>
+			<amq:transportConnector uri="tcp://localhost:61616" />
+		</amq:transportConnectors>
+
+		<!-- Network of brokers setup -->
+		<!--amq:networkConnectors>
+			<amq:networkConnector name="linkToBrokerB2" uri="static:(tcp://localhost:62616)" networkTTL="1"
duplex="false"/>
+		</amq:networkConnectors-->
+
+		<amq:destinationInterceptors>
+
+			<amq:virtualDestinationInterceptor>
+				<amq:virtualDestinations>
+					<!-- Virtual topic policies -->
+					<!-- they should be local to avoid message duplicate -->
+					<amq:virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
+				</amq:virtualDestinations>
+			</amq:virtualDestinationInterceptor>
+		</amq:destinationInterceptors>
+
+
+		<destinationPolicy>
+			<policyMap>
+				<policyEntries>
+					<policyEntry queue=">" producerFlowControl="true"  memoryLimit="4 mb">
+						<networkBridgeFilterFactory>
+							<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
+						</networkBridgeFilterFactory>
+					</policyEntry>
+				</policyEntries>
+			</policyMap>
+		</destinationPolicy>
+
+
+		<amq:destinations>
+			<!-- topics -->
+			<amq:topic physicalName="VirtualTopic.T1" />
+		</amq:destinations>
+
+	</amq:broker>
+
+	<!-- Broker 2 definition -->
+	<amq:broker xmlns="http://activemq.apache.org/schema/core" id="broker2" brokerName="B2"
useJmx="false" useShutdownHook="false" useVirtualTopics="true" persistent="false" start="true"
startAsync="true">
+
+		<!-- Transport protocol -->
+		<amq:transportConnectors>
+			<amq:transportConnector uri="tcp://localhost:62616" />
+		</amq:transportConnectors>
+
+		<!-- Network of brokers setup -->
+		<amq:networkConnectors>
+			<amq:networkConnector name="linkToBrokerB1" uri="static:(tcp://localhost:61616)" networkTTL="1"
duplex="true" />
+		</amq:networkConnectors>
+
+		<amq:destinationInterceptors>
+
+			<amq:virtualDestinationInterceptor>
+				<amq:virtualDestinations>
+					<!-- Virtual topic policies -->
+					<!-- they should be local to avoid message duplicate -->
+					<amq:virtualTopic name=">" prefix="Consumer.*."/>
+				</amq:virtualDestinations>
+			</amq:virtualDestinationInterceptor>
+		</amq:destinationInterceptors>
+
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue=">" producerFlowControl="true"  memoryLimit="4 mb">
+            <networkBridgeFilterFactory>
+              <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
+            </networkBridgeFilterFactory>
+          </policyEntry>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+
+		<amq:destinations>
+			<!-- topics -->
+			<amq:topic physicalName="VirtualTopic.T1" />
+		</amq:destinations>
+
+	</amq:broker>
+
+</beans>
+	<!-- END SNIPPET: xbean -->


Mime
View raw message