activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5843
Date Tue, 16 Jun 2015 17:37:29 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 540d8c707 -> 7b5c8be37


https://issues.apache.org/jira/browse/AMQ-5843

Adding a new property on PolicyEntry called includeBodyForAdvisory which will
include the original message body when sending advisory messages that include
the original message, instead of clearing it out.  This is turned off by
default.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/edacc2a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/edacc2a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/edacc2a8

Branch: refs/heads/master
Commit: edacc2a8404d1a460fb08edd979285961802c0ac
Parents: 540d8c7
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Mon Jun 15 17:38:49 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jun 16 12:48:38 2015 -0400

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 27 +++++++--
 .../activemq/broker/region/BaseDestination.java |  9 +++
 .../broker/region/policy/PolicyEntry.java       | 22 +++++++
 .../apache/activemq/advisory/AdvisoryTests.java | 63 +++++++++++++++++++-
 4 files changed, 115 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/edacc2a8/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 39cd2fe..7a6915d 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.MessageReference;
@@ -350,7 +351,9 @@ public class AdvisoryBroker extends BrokerFilter {
             if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
-                payload.clearBody();
+                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                    payload.clearBody();
+                }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID,
payload.getMessageId().toString());
                 fireAdvisory(context, topic, payload, null, advisoryMessage);
@@ -367,7 +370,9 @@ public class AdvisoryBroker extends BrokerFilter {
             if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
-                payload.clearBody();
+                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                    payload.clearBody();
+                }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID,
payload.getMessageId().toString());
                 ActiveMQDestination destination = payload.getDestination();
@@ -388,7 +393,9 @@ public class AdvisoryBroker extends BrokerFilter {
             if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
-                payload.clearBody();
+                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                    payload.clearBody();
+                }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID,
payload.getMessageId().toString());
                 ActiveMQDestination destination = payload.getDestination();
@@ -409,7 +416,9 @@ public class AdvisoryBroker extends BrokerFilter {
             if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
-                payload.clearBody();
+                if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                    payload.clearBody();
+                }
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 if (sub instanceof TopicSubscription) {
                     advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT,
((TopicSubscription) sub).discarded());
@@ -498,7 +507,9 @@ public class AdvisoryBroker extends BrokerFilter {
                 if (!messageReference.isAdvisory()) {
                     ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
                     Message payload = messageReference.getMessage().copy();
-                    payload.clearBody();
+                    if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination()))
{
+                        payload.clearBody();
+                    }
                     fireAdvisory(context, topic, payload);
                 }
             } catch (Exception e) {
@@ -551,6 +562,12 @@ public class AdvisoryBroker extends BrokerFilter {
         }
     }
 
+    protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) {
+        Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination);
+        return (destination instanceof BaseDestination &&
+                ((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false;
+    }
+
     private void handleFireFailure(String message, Throwable cause) {
         LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
         LOG.debug("{} detail: {}", message, cause);

http://git-wip-us.apache.org/repos/asf/activemq/blob/edacc2a8/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 5d51b24..da6ca41 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -84,6 +84,7 @@ public abstract class BaseDestination implements Destination {
     private boolean advisoryForDelivery;
     private boolean advisoryForConsumed;
     private boolean sendAdvisoryIfNoConsumers;
+    private boolean includeBodyForAdvisory;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
     protected final Broker regionBroker;
@@ -466,6 +467,14 @@ public abstract class BaseDestination implements Destination {
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
+    public boolean isIncludeBodyForAdvisory() {
+        return includeBodyForAdvisory;
+    }
+
+    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
+        this.includeBodyForAdvisory = includeBodyForAdvisory;
+    }
+
     /**
      * @return the dead letter strategy
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/edacc2a8/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 26cfa6b..2e8c2a7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -81,6 +81,7 @@ public class PolicyEntry extends DestinationMapEntry {
     private boolean advisoryWhenFull;
     private boolean advisoryForDelivery;
     private boolean advisoryForConsumed;
+    private boolean includeBodyForAdvisory;
     private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
     private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
     private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
@@ -200,6 +201,7 @@ public class PolicyEntry extends DestinationMapEntry {
         destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
         destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
         destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
+        destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
         destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
     }
 
@@ -740,6 +742,26 @@ public class PolicyEntry extends DestinationMapEntry {
         this.advisoryForFastProducers = advisoryForFastProducers;
     }
 
+    /**
+     * Returns true if the original message body should be included when applicable
+     * for advisory messages
+     *
+     * @return
+     */
+    public boolean isIncludeBodyForAdvisory() {
+        return includeBodyForAdvisory;
+    }
+
+    /**
+     * Sets if the original message body should be included when applicable
+     * for advisory messages
+     *
+     * @param includeBodyForAdvisory
+     */
+    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
+        this.includeBodyForAdvisory = includeBodyForAdvisory;
+    }
+
     public void setMaxExpirePageSize(int maxExpirePageSize) {
         this.maxExpirePageSize = maxExpirePageSize;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/edacc2a8/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 1ad1ef4..ce072aa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -44,10 +47,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test for advisory messages sent under the right circumstances.
  */
+@RunWith(Parameterized.class)
 public class AdvisoryTests {
 
     protected static final int MESSAGE_COUNT = 2000;
@@ -55,9 +62,25 @@ public class AdvisoryTests {
     protected Connection connection;
     protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
     protected int topicCount;
-
+    protected final boolean includeBodyForAdvisory;
     protected final int EXPIRE_MESSAGE_PERIOD = 10000;
 
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                // Include the full body of the message
+                {true},
+                // Don't include the full body of the message
+                {false}
+        });
+    }
+
+    public AdvisoryTests(boolean includeBodyForAdvisory) {
+        super();
+        this.includeBodyForAdvisory = includeBodyForAdvisory;
+    }
+
     @Test(timeout = 60000)
     public void testNoSlowConsumerAdvisory() throws Exception {
         Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -122,6 +145,11 @@ public class AdvisoryTests {
 
         Message msg = advisoryConsumer.receive(1000);
         assertNotNull(msg);
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
     }
 
     @Test(timeout = 60000)
@@ -149,6 +177,9 @@ public class AdvisoryTests {
         ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
         String originalId = payload.getJMSMessageID();
         assertEquals(originalId, id);
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
     }
 
     @Test(timeout = 60000)
@@ -171,6 +202,11 @@ public class AdvisoryTests {
 
         Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
         assertNotNull(msg);
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
     }
 
     @Test(timeout = 60000)
@@ -185,14 +221,24 @@ public class AdvisoryTests {
         for (int i = 0; i < 100; i++) {
             s.createConsumer(advisoryTopic);
         }
+        MessageConsumer advisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic((ActiveMQDestination)
topic));
 
         MessageProducer producer = s.createProducer(topic);
         int count = 10;
         for (int i = 0; i < count; i++) {
             BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
             producer.send(m);
         }
 
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+        //Add assertion to make sure body is included for DLQ advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
+
         // we should get here without StackOverflow
     }
 
@@ -211,11 +257,17 @@ public class AdvisoryTests {
         int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
         for (int i = 0; i < count; i++) {
             BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
             producer.send(m);
         }
 
         Message msg = advisoryConsumer.receive(1000);
         assertNotNull(msg);
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
     }
 
     @Before
@@ -258,6 +310,7 @@ public class AdvisoryTests {
         policy.setAdvisoryForDiscardingMessages(true);
         policy.setAdvisoryForSlowConsumers(true);
         policy.setAdvisoryWhenFull(true);
+        policy.setIncludeBodyForAdvisory(includeBodyForAdvisory);
         policy.setProducerFlowControl(false);
         ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
         strategy.setLimit(10);
@@ -269,4 +322,12 @@ public class AdvisoryTests {
         answer.addConnector(bindAddress);
         answer.setDeleteAllMessagesOnStartup(true);
     }
+
+    protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {
+        if (includeBodyForAdvisory) {
+            assertNotNull(payload.getContent());
+        } else {
+            assertNull(payload.getContent());
+        }
+    }
 }


Mime
View raw message