activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6070 - rework for virtual topic case, use the destination from the transient region destination rather than the message, such that consumer queue advisories work for delivered etc
Date Tue, 08 Dec 2015 14:51:51 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 1ebfa9ade -> c67590104


https://issues.apache.org/jira/browse/AMQ-6070 - rework for virtual topic case, use the destination
from the transient region destination rather than the message, such that consumer queue advisories
work for delivered etc

(cherry picked from commit 179dc3acb28a8a7fc3c1eddf6c6ac54fe49836a5)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/558dcc04
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/558dcc04
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/558dcc04

Branch: refs/heads/activemq-5.13.x
Commit: 558dcc0479bc355fd5b2ecf0d62d1caefbb05fce
Parents: 1ebfa9a
Author: gtully <gary.tully@gmail.com>
Authored: Tue Dec 8 11:04:58 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Dec 8 09:51:23 2015 -0500

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 47 ++++++++------------
 .../apache/activemq/broker/region/Topic.java    |  3 +-
 .../apache/activemq/advisory/AdvisoryTests.java | 47 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/558dcc04/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 6a485fc..d7c9aa8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -426,9 +426,10 @@ public class AdvisoryBroker extends BrokerFilter {
         super.messageExpired(context, messageReference, subscription);
         try {
             if (!messageReference.isAdvisory()) {
-                ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
+                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
+                ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());
                 Message payload = messageReference.getMessage().copy();
-                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                if (!baseDestination.isIncludeBodyForAdvisory()) {
                     payload.clearBody();
                 }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
@@ -445,17 +446,15 @@ public class AdvisoryBroker extends BrokerFilter {
         super.messageConsumed(context, messageReference);
         try {
             if (!messageReference.isAdvisory()) {
-                ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
+                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
+                ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination());
                 Message payload = messageReference.getMessage().copy();
-                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                if (!baseDestination.isIncludeBodyForAdvisory()) {
                     payload.clearBody();
                 }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID,
payload.getMessageId().toString());
-                ActiveMQDestination destination = payload.getDestination();
-                if (destination != null) {
-                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION,
destination.getQualifiedName());
-                }
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION,
baseDestination.getActiveMQDestination().getQualifiedName());
                 fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {
@@ -468,17 +467,15 @@ public class AdvisoryBroker extends BrokerFilter {
         super.messageDelivered(context, messageReference);
         try {
             if (!messageReference.isAdvisory()) {
-                ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
+                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
+                ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination());
                 Message payload = messageReference.getMessage().copy();
-                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                if (!baseDestination.isIncludeBodyForAdvisory()) {
                     payload.clearBody();
                 }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID,
payload.getMessageId().toString());
-                ActiveMQDestination destination = payload.getDestination();
-                if (destination != null) {
-                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION,
destination.getQualifiedName());
-                }
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION,
baseDestination.getActiveMQDestination().getQualifiedName());
                 fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {
@@ -491,9 +488,10 @@ public class AdvisoryBroker extends BrokerFilter {
         super.messageDiscarded(context, sub, messageReference);
         try {
             if (!messageReference.isAdvisory()) {
-                ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
+                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
+                ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination());
                 Message payload = messageReference.getMessage().copy();
-                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                if (!baseDestination.isIncludeBodyForAdvisory()) {
                     payload.clearBody();
                 }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
@@ -502,10 +500,8 @@ public class AdvisoryBroker extends BrokerFilter {
                 }
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID,
payload.getMessageId().toString());
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID,
sub.getConsumerInfo().getConsumerId().toString());
-                ActiveMQDestination destination = payload.getDestination();
-                if (destination != null) {
-                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION,
destination.getQualifiedName());
-                }
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION,
baseDestination.getActiveMQDestination().getQualifiedName());
+
                 fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {
@@ -716,9 +712,10 @@ public class AdvisoryBroker extends BrokerFilter {
         if (wasDLQd) {
             try {
                 if (!messageReference.isAdvisory()) {
-                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
+                    BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
+                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination());
                     Message payload = messageReference.getMessage().copy();
-                    if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                    if (!baseDestination.isIncludeBodyForAdvisory()) {
                         payload.clearBody();
                     }
                     fireAdvisory(context, topic, payload);
@@ -773,12 +770,6 @@ public class AdvisoryBroker extends BrokerFilter {
         }
     }
 
-    protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) {
-        Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination);
-        return (destination instanceof BaseDestination &&
-                ((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false;
-    }
-
     private void handleFireFailure(String message, Throwable cause) {
         LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
         LOG.debug("{} detail: {}", message, cause);

http://git-wip-us.apache.org/repos/asf/activemq/blob/558dcc04/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 02c5fbe..8c63c02 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -356,6 +356,8 @@ public class Topic extends BaseDestination implements Task {
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0
                 && !context.isInRecoveryMode();
 
+        message.setRegionDestination(this);
+
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if (message.isExpired()) {
@@ -494,7 +496,6 @@ public class Topic extends BaseDestination implements Task {
     synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message)
             throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
-        message.setRegionDestination(this);
         message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
         Future<Object> result = null;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/558dcc04/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index cdd8d99..9acf7bf 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -44,9 +45,10 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -67,7 +69,7 @@ public class AdvisoryTests {
     protected final int EXPIRE_MESSAGE_PERIOD = 10000;
 
 
-    @Parameters
+    @Parameters(name = "includeBodyForAdvisory={0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
                 // Include the full body of the message
@@ -293,6 +295,47 @@ public class AdvisoryTests {
         assertIncludeBodyForAdvisory(payload);
     }
 
+    @Test(timeout = 60000)
+    public void testMessageDeliveryVTAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQTopic vt = new ActiveMQTopic("VirtualTopic.TEST");
+
+        ActiveMQQueue a  = new ActiveMQQueue("Consumer.A.VirtualTopic.TEST");
+        MessageConsumer consumer = s.createConsumer(a);
+
+        ActiveMQQueue b = new ActiveMQQueue("Consumer.B.VirtualTopic.TEST");
+        MessageConsumer consumerB = s.createConsumer(b);
+
+        assertNotNull(consumer);
+        assertNotNull(consumerB);
+
+        HashSet<String> dests = new HashSet<String>();
+        dests.add(vt.getQualifiedName());
+        dests.add(a.getQualifiedName());
+        dests.add(b.getQualifiedName());
+
+
+        Topic advisoryTopic = new ActiveMQTopic(AdvisorySupport.MESSAGE_DELIVERED_TOPIC_PREFIX
+ ">");
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+
+        // throw messages at the vt
+        MessageProducer producer = s.createProducer(vt);
+
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+
+        Message msg = null;
+        while ((msg = advisoryConsumer.receive(1000)) != null) {
+            ActiveMQMessage message = (ActiveMQMessage) msg;
+            String dest = (String) message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION);
+            dests.remove(dest);
+            assertIncludeBodyForAdvisory((ActiveMQMessage) message.getDataStructure());
+        }
+
+        assertTrue("Got delivered for all: " + dests, dests.isEmpty());
+    }
+
     @Before
     public void setUp() throws Exception {
         if (broker == null) {


Mime
View raw message