activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5541 - support preemptive redelivery for non persistent case, fix and test
Date Tue, 27 Jan 2015 16:45:39 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk bae0e60a7 -> dc25f2a8f


https://issues.apache.org/jira/browse/AMQ-5541 - support preemptive redelivery for non persistent
case, fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dc25f2a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dc25f2a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dc25f2a8

Branch: refs/heads/trunk
Commit: dc25f2a8f9d4e54c2349946b1337eb4b72890e07
Parents: bae0e60
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jan 27 16:43:35 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Jan 27 16:43:35 2015 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/RegionBroker.java    |  6 +-
 .../RedeliveryRestartWithExceptionTest.java     | 59 +++++++++++++++++++-
 2 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dc25f2a8/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 2943c98..3acc135 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -623,11 +623,13 @@ public class RegionBroker extends EmptyBroker {
                 long totalTime = endTime - message.getBrokerInTime();
                 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
             }
-            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered()
&& !message.isRedelivered() && message.isPersistent()) {
+            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered()
&& !message.isRedelivered()) {
                 final int originalValue = message.getRedeliveryCounter();
                 message.incrementRedeliveryCounter();
                 try {
-                    ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
+                    if (message.isPersistent()) {
+                        ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
+                    }
                     messageDispatch.setTransmitCallback(new TransmitCallback() {
                         // dispatch is considered a delivery, so update sub state post dispatch
otherwise
                         // on a disconnect/reconnect cached messages will not reflect initial
delivery attempt

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc25f2a8/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
index eae86d6..5d8b62e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
@@ -44,6 +45,7 @@ import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.usage.SystemUsage;
 import org.junit.After;
 import org.junit.Before;
@@ -97,7 +99,7 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
 
         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         Destination destination = session.createQueue(queueName);
-        populateDestination(10, destination, connection);
+        populateDestination(10, destination, connection, true);
         TextMessage msg = null;
         MessageConsumer consumer = session.createConsumer(destination);
         Exception expectedException = null;
@@ -165,7 +167,7 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
 
         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         Destination destination = session.createQueue(queueName);
-        populateDestination(10, destination, connection);
+        populateDestination(10, destination, connection, true);
         TextMessage msg = null;
         MessageConsumer consumer = session.createConsumer(destination);
         Exception expectedException = null;
@@ -218,6 +220,56 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
         connection.close();
     }
 
+    @org.junit.Test
+    public void testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnectionDrop()
throws Exception {
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection, false);
+        TextMessage msg = null;
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(5000);
+            assertNotNull("got the message", msg);
+            assertFalse("not redelivered", msg.getJMSRedelivered());
+        }
+
+        connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new
IOException("Die"));
+
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+
+        // consume the messages that were previously delivered
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("redelivery flag set on:" + i, true, msg.getJMSRedelivered());
+            assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount")
> 1);
+            msg.acknowledge();
+        }
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
     private void restartBroker() throws Exception {
         broker.stop();
         broker.waitUntilStopped();
@@ -231,9 +283,10 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
         return broker;
     }
 
-    private void populateDestination(final int nbMessages, final Destination destination,
javax.jms.Connection connection) throws JMSException {
+    private void populateDestination(final int nbMessages, final Destination destination,
javax.jms.Connection connection, boolean persistent) throws JMSException {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
         for (int i = 1; i <= nbMessages; i++) {
             producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
         }


Mime
View raw message