activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [4/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6042
Date Fri, 20 Nov 2015 21:18:47 GMT
https://issues.apache.org/jira/browse/AMQ-6042

Apply fix and test from Martin Lichtin to preserve the rollback cause in
all ack modes.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4a27b723
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4a27b723
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4a27b723

Branch: refs/heads/master
Commit: 4a27b72377bf8104b1a586bb26946c59508a3f26
Parents: cc6213e
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Nov 20 16:18:27 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Nov 20 16:18:27 2015 -0500

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |   5 +-
 .../activemq/MessageListenerRedeliveryTest.java | 132 +++++++++++++++----
 2 files changed, 108 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4a27b723/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 9ccd8e6..edc383f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -1401,13 +1401,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                                 afterMessageIsConsumed(md, expired);
                             } catch (RuntimeException e) {
                                 LOG.error("{} Exception while processing message: {}", getConsumerId(),
md.getMessage().getMessageId(), e);
+                                md.setRollbackCause(e);
                                 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() ||
session.isIndividualAcknowledge()) {
                                     // schedual redelivery and possible dlq processing
-                                    md.setRollbackCause(e);
                                     rollback();
                                 } else {
-                                    // Transacted or Client ack: Deliver the
-                                    // next message.
+                                    // Transacted or Client ack: Deliver the next message.
                                     afterMessageIsConsumed(md, false);
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4a27b723/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
index 0fb9728..62f4995 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -33,42 +38,49 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageListenerRedeliveryTest extends TestCase {
+public class MessageListenerRedeliveryTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class);
 
+    @Rule
+    public TestName name = new TestName();
+
     private Connection connection;
 
-    @Override
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         connection = createConnection();
     }
 
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    @Override
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (connection != null) {
             connection.close();
             connection = null;
         }
     }
 
+    protected String getTestName() {
+        return name.getMethodName();
+    }
+
     protected RedeliveryPolicy getRedeliveryPolicy() {
         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         redeliveryPolicy.setInitialRedeliveryDelay(0);
         redeliveryPolicy.setRedeliveryDelay(1000);
         redeliveryPolicy.setMaximumRedeliveries(3);
-        redeliveryPolicy.setBackOffMultiplier((short)2);
+        redeliveryPolicy.setBackOffMultiplier((short) 2);
         redeliveryPolicy.setUseExponentialBackOff(true);
         return redeliveryPolicy;
     }
@@ -107,11 +119,12 @@ public class MessageListenerRedeliveryTest extends TestCase {
         }
     }
 
+    @Test(timeout = 60000)
     public void testQueueRollbackConsumerListener() throws JMSException {
         connection.start();
 
         Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
-        Queue queue = session.createQueue("queue-" + getName());
+        Queue queue = session.createQueue("queue-" + getTestName());
         MessageProducer producer = createProducer(session, queue);
         Message message = createTextMessage(session);
         producer.send(message);
@@ -119,7 +132,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
 
         MessageConsumer consumer = session.createConsumer(queue);
 
-        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer;
+        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
         mc.setRedeliveryPolicy(getRedeliveryPolicy());
 
         TestMessageListener listener = new TestMessageListener(session);
@@ -170,11 +183,12 @@ public class MessageListenerRedeliveryTest extends TestCase {
         session.close();
     }
 
+    @Test(timeout = 60000)
     public void testQueueRollbackSessionListener() throws JMSException {
         connection.start();
 
         Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
-        Queue queue = session.createQueue("queue-" + getName());
+        Queue queue = session.createQueue("queue-" + getTestName());
         MessageProducer producer = createProducer(session, queue);
         Message message = createTextMessage(session);
         producer.send(message);
@@ -182,7 +196,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
 
         MessageConsumer consumer = session.createConsumer(queue);
 
-        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer;
+        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
         mc.setRedeliveryPolicy(getRedeliveryPolicy());
 
         TestMessageListener listener = new TestMessageListener(session);
@@ -235,11 +249,12 @@ public class MessageListenerRedeliveryTest extends TestCase {
         session.close();
     }
 
-    public void testQueueSessionListenerExceptionRetry() throws  Exception {
+    @Test(timeout = 60000)
+    public void testQueueSessionListenerExceptionRetry() throws Exception {
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("queue-" + getName());
+        Queue queue = session.createQueue("queue-" + getTestName());
         MessageProducer producer = createProducer(session, queue);
         Message message = createTextMessage(session, "1");
         producer.send(message);
@@ -249,7 +264,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
         MessageConsumer consumer = session.createConsumer(queue);
 
         final CountDownLatch gotMessage = new CountDownLatch(2);
-        final AtomicInteger count  = new AtomicInteger(0);
+        final AtomicInteger count = new AtomicInteger(0);
         final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
         final ArrayList<String> received = new ArrayList<String>();
         consumer.setMessageListener(new MessageListener() {
@@ -263,7 +278,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
                     fail(e.toString());
                 }
                 if (count.incrementAndGet() < maxDeliveries) {
-                    throw new RuntimeException(getName() + " force a redelivery");
+                    throw new RuntimeException(getTestName() + " force a redelivery");
                 }
                 // new blood
                 count.set(0);
@@ -273,20 +288,21 @@ public class MessageListenerRedeliveryTest extends TestCase {
 
         assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
 
-        for (int i=0; i<maxDeliveries; i++) {
+        for (int i = 0; i < maxDeliveries; i++) {
             assertEquals("got first redelivered: " + i, "1", received.get(i));
         }
-        for (int i=maxDeliveries; i<maxDeliveries*2; i++) {
+        for (int i = maxDeliveries; i < maxDeliveries * 2; i++) {
             assertEquals("got first redelivered: " + i, "2", received.get(i));
         }
         session.close();
     }
 
-    public void testQueueSessionListenerExceptionDlq() throws  Exception {
+    @Test(timeout = 60000)
+    public void testQueueSessionListenerExceptionDlq() throws Exception {
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("queue-" + getName());
+        Queue queue = session.createQueue("queue-" + getTestName());
         MessageProducer producer = createProducer(session, queue);
         Message message = createTextMessage(session);
         producer.send(message);
@@ -314,7 +330,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
             public void onMessage(Message message) {
                 LOG.info("Message Received: " + message);
                 gotMessage.countDown();
-                throw new RuntimeException(getName() + " force a redelivery");
+                throw new RuntimeException(getTestName() + " force a redelivery");
             }
         });
 
@@ -331,7 +347,70 @@ public class MessageListenerRedeliveryTest extends TestCase {
         LOG.info("DLQ'd message cause reported as: {}", cause);
 
         assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException"));
-        assertTrue("is correct exception", cause.contains(getName()));
+        assertTrue("is correct exception", cause.contains(getTestName()));
+        assertTrue("cause exception is remembered", cause.contains("Throwable"));
+        assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
+
+        session.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testTransactedQueueSessionListenerExceptionDlq() throws Exception {
+        connection.start();
+
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue("queue-" + getTestName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session);
+        producer.send(message);
+        session.commit();
+
+        final Message[] dlqMessage = new Message[1];
+        ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
+        MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
+        final CountDownLatch gotDlqMessage = new CountDownLatch(1);
+        dlqConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("DLQ Message Received: " + message);
+                dlqMessage[0] = message;
+                gotDlqMessage.countDown();
+            }
+        });
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
+        final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("Message Received: " + message);
+                gotMessage.countDown();
+                try {
+                    session.rollback();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+                throw new RuntimeException(getTestName() + " force a redelivery");
+            }
+        });
+
+        assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
+
+        // check DLQ
+        assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
+
+        // check DLQ message cause is captured
+        message = dlqMessage[0];
+        assertNotNull("dlq message captured", message);
+        String cause = message.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+
+        LOG.info("DLQ'd message cause reported as: {}", cause);
+
+        assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException"));
+        assertTrue("is correct exception", cause.contains(getTestName()));
         assertTrue("cause exception is remembered", cause.contains("Throwable"));
         assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
 
@@ -341,6 +420,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
     private TextMessage createTextMessage(Session session, String text) throws JMSException
{
         return session.createTextMessage(text);
     }
+
     private TextMessage createTextMessage(Session session) throws JMSException {
         return session.createTextMessage("Hello");
     }


Mime
View raw message