activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject [01/10] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5674 - initialRedeliveryDelay not respected
Date Tue, 04 Aug 2015 15:59:20 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.11.x 256bcf334 -> a37b43cca


https://issues.apache.org/jira/browse/AMQ-5674 - initialRedeliveryDelay not respected


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8ff47d6e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8ff47d6e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8ff47d6e

Branch: refs/heads/activemq-5.11.x
Commit: 8ff47d6edc9d64cac0c8d8ad8bd9d57aa53a891b
Parents: 256bcf3
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Wed Mar 18 17:07:45 2015 +0100
Committer: Daniel Kulp <dkulp@apache.org>
Committed: Tue Aug 4 08:44:04 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/activemq/ActiveMQSession.java     |  4 ++--
 .../java/org/apache/activemq/RedeliveryPolicy.java    |  5 ++++-
 .../src/test/java/org/apache/activemq/ra/MDBTest.java | 14 ++++++++++++--
 .../org/apache/activemq/RedeliveryPolicyTest.java     | 14 ++++++++++++++
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8ff47d6e/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index e327ef1..14c2869 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -937,7 +937,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                         @Override
                         public void afterRollback() throws Exception {
                             LOG.trace("rollback {}", ack, new Throwable("here"));
-                            md.getMessage().onMessageRolledBack();
                             // ensure we don't filter this as a duplicate
                             connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
 
@@ -956,7 +955,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                             RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                             int redeliveryCounter = md.getMessage().getRedeliveryCounter();
                             if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
-                                && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries())
{
+                                && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries())
{
                                 // We need to NACK the messages so that they get
                                 // sent to the
                                 // DLQ.
@@ -986,6 +985,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                                     }
                                 }, redeliveryDelay);
                             }
+                            md.getMessage().onMessageRolledBack();
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8ff47d6e/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
index 91f2b71..e0a8f33 100644
--- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
+++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
@@ -98,7 +98,10 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
     }
 
     public long getNextRedeliveryDelay(long previousDelay) {
-        long nextDelay = redeliveryDelay;
+        long nextDelay = initialRedeliveryDelay;
+        if (nextDelay == 0) {
+            nextDelay = redeliveryDelay;
+        }
 
         if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier
> 1) {
             nextDelay = (long) (previousDelay * backOffMultiplier);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8ff47d6e/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index 5927d3e..904dd18 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -23,6 +23,8 @@ import java.lang.reflect.Method;
 import java.util.Timer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
@@ -424,12 +426,18 @@ public class MDBTest extends TestCase {
         adapter.setServerUrl("vm://localhost?broker.persistent=false");
         adapter.start(new StubBootstrapContext());
 
-        final CountDownLatch messageDelivered = new CountDownLatch(2);
+        final CountDownLatch messageDelivered = new CountDownLatch(5);
+        final AtomicLong timeReceived = new AtomicLong();
+        final AtomicBoolean failed = new AtomicBoolean(false);
 
         final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
             public void onMessage(Message message) {
                 super.onMessage(message);
                 try {
+                    long now = System.currentTimeMillis();
+                    if ((now - timeReceived.getAndSet(now)) > 1000) {
+                        failed.set(true);
+                    }
                     messageDelivered.countDown();
                     if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
                         throw new RuntimeException(getName() + " ex on first delivery");
@@ -463,6 +471,7 @@ public class MDBTest extends TestCase {
         ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
         activationSpec.setDestinationType(Queue.class.getName());
         activationSpec.setDestination("TEST");
+        activationSpec.setInitialRedeliveryDelay(100);
         activationSpec.setResourceAdapter(adapter);
         activationSpec.validate();
 
@@ -486,7 +495,7 @@ public class MDBTest extends TestCase {
         } catch (Exception e) {
 
         }
-
+        timeReceived.set(System.currentTimeMillis());
         // Send the broker a message to that endpoint
         MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
         producer.send(session.createTextMessage("Hello!"));
@@ -494,6 +503,7 @@ public class MDBTest extends TestCase {
 
         // Wait for the message to be delivered twice.
         assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+        assertFalse("Delivery policy delay not working", failed.get());
 
         // Shut the Endpoint down.
         adapter.endpointDeactivation(messageEndpointFactory, activationSpec);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8ff47d6e/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index e2b5867..1f8f687 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -69,6 +69,20 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertEquals(500, delay);
     }
 
+    public void testGetNextWithInitialDelay() throws Exception {
+
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(500);
+
+        long delay = policy.getNextRedeliveryDelay(500);
+        assertEquals(500, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(500, delay);
+
+    }
+
     /**
      * @throws Exception
      */


Mime
View raw message