activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5146 - validate redeliveryPolicy excess pre dispatch, fix and test
Date Wed, 16 Apr 2014 14:55:01 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk e94792751 -> fad1dd0f1


https://issues.apache.org/jira/browse/AMQ-5146 - validate redeliveryPolicy excess pre dispatch,
fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fad1dd0f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fad1dd0f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fad1dd0f

Branch: refs/heads/trunk
Commit: fad1dd0f1727af8ce6e0d05a51c32cc63f658e24
Parents: e947927
Author: gtully <gary.tully@gmail.com>
Authored: Wed Apr 16 15:48:27 2014 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Apr 16 15:49:34 2014 +0100

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  42 +++-
 .../apache/activemq/RedeliveryPolicyTest.java   | 226 ++++++++++++++++++-
 2 files changed, 252 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fad1dd0f/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index d862f70..89dbc81 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -497,6 +497,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
+                } else if (redeliveryExceeded(md)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(getConsumerId() + " received with excessive redelivered:
" + md);
+                    }
+                    posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery
policy limit:" + redeliveryPolicy);
                 } else {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace(getConsumerId() + " received message: " + md);
@@ -510,6 +515,25 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
         }
     }
 
+    private void posionAck(MessageDispatch md, String cause) throws JMSException {
+        MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
+        posionAck.setFirstMessageId(md.getMessage().getMessageId());
+        posionAck.setPoisonCause(new Throwable(cause));
+        session.sendAck(posionAck);
+    }
+
+    private boolean redeliveryExceeded(MessageDispatch md) {
+        try {
+            return redeliveryPolicy != null
+                    && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+                    && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
+                    // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
+                    && md.getMessage().getProperty("redeliveryDelay") == null;
+        } catch (IOException ignored) {
+            return false;
+        }
+    }
+
     /**
      * Receives the next message produced for this message consumer.
      * <P>
@@ -1353,6 +1377,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage()))
{
                         if (listener != null && unconsumedMessages.isRunning()) {
+                            if (redeliveryExceeded(md)) {
+                                posionAck(md, "dispatch to " + getConsumerId() + " exceeds
redelivery policy limit:" + redeliveryPolicy);
+                                return;
+                            }
                             ActiveMQMessage message = createActiveMQMessage(md);
                             beforeMessageIsConsumed(md);
                             try {
@@ -1386,10 +1414,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                     } else {
                         if (!session.isTransacted()) {
                             LOG.warn("Duplicate non transacted dispatch to consumer: "  +
getConsumerId() + ", poison acking: " + md);
-                            MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE,
1);
-                            poisonAck.setFirstMessageId(md.getMessage().getMessageId());
-                            poisonAck.setPoisonCause(new Throwable("Duplicate non transacted
delivery to " + getConsumerId()));
-                            session.sendAck(poisonAck);
+                            posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
                         } else {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(getConsumerId() + " tracking transacted redelivery
of duplicate: " + md.getMessage());
@@ -1405,14 +1430,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                                 }
                             }
                             if (needsPoisonAck) {
-                                MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE,
1);
-                                poisonAck.setFirstMessageId(md.getMessage().getMessageId());
-                                poisonAck.setPoisonCause(new JMSException("Duplicate dispatch
with transacted redeliver pending on another consumer, connection: "
-                                        + session.getConnection().getConnectionInfo().getConnectionId()));
                                 LOG.warn("acking duplicate delivery as poison, redelivery
must be pending to another"
                                         + " consumer on this connection, failoverRedeliveryWaitPeriod="
-                                        + failoverRedeliveryWaitPeriod + ". Message: " +
md + ", poisonAck: " + poisonAck);
-                                session.sendAck(poisonAck);
+                                        + failoverRedeliveryWaitPeriod + ". Message: " +
md);
+                                posionAck(md, "Duplicate dispatch with transacted redeliver
pending on another consumer, connection: "
+                                        + session.getConnection().getConnectionInfo().getConnectionId());
                             } else {
                                 if (transactedIndividualAck) {
                                     immediateIndividualTransactedAck(md);

http://git-wip-us.apache.org/repos/asf/activemq/blob/fad1dd0f/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 02db378..e2b5867 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -16,23 +16,28 @@
  */
 package org.apache.activemq;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-
 import junit.framework.Test;
 
 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 
-/**
- * Test cases used to test the JMS message exclusive consumers.
- *
- *
- */
 public class RedeliveryPolicyTest extends JmsTestSupport {
 
     public static Test suite() {
@@ -383,6 +388,215 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
 
     }
 
+    public void testRepeatedRedeliveryReceiveNoCommit() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final int maxRedeliveries = 4;
+        for (int i=0;i<=maxRedeliveries +1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+            // Receive a message with the JMS API
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
+            if (i<=maxRedeliveries) {
+                assertEquals("1st", m.getText());
+                assertEquals(i, m.getRedeliveryCounter());
+            } else {
+                assertNull("null on exceeding redelivery count", m);
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
+
+    public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final int maxRedeliveries = 4;
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        for (int i=0;i<=maxRedeliveries+1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(destination);
+            final CountDownLatch done = new CountDownLatch(1);
+
+            consumer.setMessageListener(new MessageListener(){
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        ActiveMQTextMessage m = (ActiveMQTextMessage)message;
+                        assertEquals("1st", m.getText());
+                        assertEquals(receivedCount.get(), m.getRedeliveryCounter());
+                        receivedCount.incrementAndGet();
+                        done.countDown();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+
+            if (i<=maxRedeliveries) {
+                assertTrue("listener done", done.await(5, TimeUnit.SECONDS));
+            } else {
+                // final redlivery gets poisoned before dispatch
+                assertFalse("listener done", done.await(1, TimeUnit.SECONDS));
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
+    public void testRepeatedRedeliveryServerSessionNoCommit() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final int maxRedeliveries = 4;
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        for (int i=0;i<=maxRedeliveries+1;i++) {
+
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            final ActiveMQSession session = (ActiveMQSession) connection.createSession(true,
Session.SESSION_TRANSACTED);
+            session.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+                        assertEquals("1st", m.getText());
+                        assertEquals(receivedCount.get(), m.getRedeliveryCounter());
+                        receivedCount.incrementAndGet();
+                        done.countDown();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+
+            connection.createConnectionConsumer(
+                    destination,
+                    null,
+                    new ServerSessionPool() {
+                        @Override
+                        public ServerSession getServerSession() throws JMSException {
+                            return new ServerSession() {
+                                @Override
+                                public Session getSession() throws JMSException {
+                                    return session;
+                                }
+
+                                @Override
+                                public void start() throws JMSException {
+                                }
+                            };
+                        }
+                    },
+                    100,
+                    false);
+
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    session.run();
+                    return done.await(10, TimeUnit.MILLISECONDS);
+                }
+            });
+
+            if (i<=maxRedeliveries) {
+                assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
+            } else {
+                // final redlivery gets poisoned before dispatch
+                assertFalse("listener not done @" + i, done.await(1, TimeUnit.SECONDS));
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
 
     public void testInitialRedeliveryDelayZero() throws Exception {
 


Mime
View raw message