activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5068 - don't rewrite durable subs as the message instance is shared
Date Wed, 26 Mar 2014 11:50:34 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 266d23ef7 -> 75eb814ca


https://issues.apache.org/jira/browse/AMQ-5068 - don't rewrite durable subs as the message
instance is shared


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/75eb814c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/75eb814c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/75eb814c

Branch: refs/heads/trunk
Commit: 75eb814ca7f05a78129628606488d47d89f7cf85
Parents: 266d23e
Author: gtully <gary.tully@gmail.com>
Authored: Wed Mar 26 11:50:13 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Mar 26 11:50:13 2014 +0000

----------------------------------------------------------------------
 .../broker/region/policy/PolicyEntry.java       |  3 +-
 .../activemq/broker/RedeliveryRestartTest.java  | 57 +++++++++++++++++---
 2 files changed, 51 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/75eb814c/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 624d490..f7ae6c1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -128,6 +128,7 @@ public class PolicyEntry extends DestinationMapEntry {
         queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
         queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
         queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
+        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
     }
 
     public void update(Queue queue) {
@@ -142,6 +143,7 @@ public class PolicyEntry extends DestinationMapEntry {
         queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
         queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
         queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
+        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
     }
 
     public void configure(Broker broker,Topic topic) {
@@ -197,7 +199,6 @@ public class PolicyEntry extends DestinationMapEntry {
         destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
         destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
         destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
-        destination.setPersistJMSRedelivered(isPersistJMSRedelivered());
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/75eb814c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
index 8eba729..032934b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
@@ -24,11 +24,13 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.transport.failover.FailoverTransport;
 import org.junit.After;
 import org.junit.Before;
@@ -91,10 +93,9 @@ public class RedeliveryRestartTest extends TestSupport {
         connection = (ActiveMQConnection) connectionFactory.createConnection();
         connection.start();
 
-        populateDestination(10, queueName, connection);
-
         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection);
 
         MessageConsumer consumer = session.createConsumer(destination);
         TextMessage msg = null;
@@ -136,17 +137,59 @@ public class RedeliveryRestartTest extends TestSupport {
     }
 
     @org.junit.Test
-    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+    public void testDurableSubRedeliveryFlagAfterRestartNotSupported() throws Exception {
 
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:("
+ broker.getTransportConnectors().get(0).getPublishableConnectString()
             + ")?jms.prefetchPolicy.all=0");
         connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setClientID("id");
         connection.start();
 
-        populateDestination(10, queueName, connection);
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ActiveMQTopic destination = new ActiveMQTopic(queueName);
+
+        TopicSubscriber durableSub = session.createDurableSubscriber(destination, "id");
+
+        populateDestination(10, destination, connection);
+
+        TextMessage msg = null;
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) durableSub.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        durableSub.close();
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
+                .getPublishableConnectString());
+
+        durableSub = session.createDurableSubscriber(destination, "id");
+        for (int i = 0; i < 10; i++) {
+            msg = (TextMessage) durableSub.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("no reDelivery flag", false, msg.getJMSRedelivered());
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:("
+ broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + ")?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
 
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection);
 
         MessageConsumer consumer = session.createConsumer(destination);
         TextMessage msg = null;
@@ -196,10 +239,9 @@ public class RedeliveryRestartTest extends TestSupport {
         connection = (ActiveMQConnection) connectionFactory.createConnection();
         connection.start();
 
-        populateDestination(1, queueName, connection);
-
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Destination destination = session.createQueue(queueName);
+        populateDestination(1, destination, connection);
 
         MessageConsumer consumer = session.createConsumer(destination);
         TextMessage msg = (TextMessage) consumer.receive(5000);
@@ -243,9 +285,8 @@ public class RedeliveryRestartTest extends TestSupport {
         return broker;
     }
 
-    private void populateDestination(final int nbMessages, final String destinationName,
javax.jms.Connection connection) throws JMSException {
+    private void populateDestination(final int nbMessages, final Destination destination,
javax.jms.Connection connection) throws JMSException {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createQueue(destinationName);
         MessageProducer producer = session.createProducer(destination);
         for (int i = 1; i <= nbMessages; i++) {
             producer.send(session.createTextMessage("<hello id='" + i + "'/>"));


Mime
View raw message