activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r982903 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/ test/java/org/apache/activemq/bugs/
Date Fri, 06 Aug 2010 08:10:53 GMT
Author: gtully
Date: Fri Aug  6 08:10:52 2010
New Revision: 982903

URL: http://svn.apache.org/viewvc?rev=982903&view=rev
Log:
resolve: fix https://issues.apache.org/activemq/browse/AMQ-1847 - set initialRedeliveryDelay=0
to get non delayed first redelivery, use deliveryDelay to set the base for subsequent redelivery.
existing users with an initialRedeliveryDelay set will now see it being respected.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Aug  6 08:10:52 2010
@@ -1099,11 +1099,13 @@ public class ActiveMQMessageConsumer imp
                     return;
                 }
     
-                // Only increase the redelivery delay after the first redelivery..
+                // use initial delay for first redelivery
                 MessageDispatch lastMd = deliveredMessages.getFirst();
                 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
                 if (currentRedeliveryCount > 0) {
-                    redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
+                } else {
+                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                 }
                 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Fri
Aug  6 08:10:52 2010
@@ -874,9 +874,9 @@ public class ActiveMQSession implements 
 
                                 // Figure out how long we should wait to resend
                                 // this message.
-                                long redeliveryDelay = 0;
+                                long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                                 for (int i = 0; i < redeliveryCounter; i++) {
-                                    redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
                                 }
                                 scheduler.executeAfterDelay(new Runnable() {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Fri
Aug  6 08:10:52 2010
@@ -38,6 +38,7 @@ public class RedeliveryPolicy implements
     private boolean useCollisionAvoidance;
     private boolean useExponentialBackOff;
     private double backOffMultiplier = 5.0;
+    private long redeliveryDelay = initialRedeliveryDelay;
 
     public RedeliveryPolicy() {
     }
@@ -82,15 +83,15 @@ public class RedeliveryPolicy implements
         this.maximumRedeliveries = maximumRedeliveries;
     }
 
-    public long getRedeliveryDelay(long previousDelay) {
-        long redeliveryDelay;
+    public long getNextRedeliveryDelay(long previousDelay) {
+        long nextDelay;
 
         if (previousDelay == 0) {
-            redeliveryDelay = initialRedeliveryDelay;
+            nextDelay = redeliveryDelay;
         } else if (useExponentialBackOff && backOffMultiplier > 1) {
-            redeliveryDelay = (long) (previousDelay * backOffMultiplier);
+            nextDelay = (long) (previousDelay * backOffMultiplier);
         } else {
-            redeliveryDelay = previousDelay;
+            nextDelay = previousDelay;
         }
 
         if (useCollisionAvoidance) {
@@ -100,10 +101,10 @@ public class RedeliveryPolicy implements
              */
             Random random = getRandomNumberGenerator();
             double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor)
* random.nextDouble();
-            redeliveryDelay += redeliveryDelay * variance;
+            nextDelay += nextDelay * variance;
         }
 
-        return redeliveryDelay;
+        return nextDelay;
     }
 
     public boolean isUseCollisionAvoidance() {
@@ -129,4 +130,11 @@ public class RedeliveryPolicy implements
         return randomNumberGenerator;
     }
 
+    public void setRedeliveryDelay(long redeliveryDelay) {
+        this.redeliveryDelay = redeliveryDelay;
+    }
+
+    public long getRedeliveryDelay() {
+        return redeliveryDelay;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Fri Aug  6 08:10:52 2010
@@ -62,7 +62,8 @@ public class MessageListenerRedeliveryTe
 
     protected RedeliveryPolicy getRedeliveryPolicy() {
         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
-        redeliveryPolicy.setInitialRedeliveryDelay(1000);
+        redeliveryPolicy.setInitialRedeliveryDelay(0);
+        redeliveryPolicy.setRedeliveryDelay(1000);
         redeliveryPolicy.setMaximumRedeliveries(3);
         redeliveryPolicy.setBackOffMultiplier((short)2);
         redeliveryPolicy.setUseExponentialBackOff(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
Fri Aug  6 08:10:52 2010
@@ -47,7 +47,8 @@ public class RedeliveryPolicyTest extend
 
         // Receive a message with the JMS API
         RedeliveryPolicy policy = connection.getRedeliveryPolicy();
-        policy.setInitialRedeliveryDelay(500);
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(500);              
         policy.setBackOffMultiplier((short) 2);
         policy.setUseExponentialBackOff(true);
         
@@ -102,8 +103,9 @@ public class RedeliveryPolicyTest extend
 
         // Receive a message with the JMS API
         RedeliveryPolicy policy = connection.getRedeliveryPolicy();
-        policy.setInitialRedeliveryDelay(500);
-        
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(500);
+
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue(getName());
@@ -303,5 +305,128 @@ public class RedeliveryPolicyTest extend
      
   
        
-    }    
+    }
+
+
+    public void testInitialRedeliveryDelayZero() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(1);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+        session.commit();
+    }
+
+
+    public void testInitialRedeliveryDelayOne() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(1000);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(1);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+    }
+
+    public void testRedeliveryDelayOne() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(1000);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(2);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull("first immediate redelivery", m);
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(100);
+        assertNull("second delivery delayed: " + m, m);
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java Fri
Aug  6 08:10:52 2010
@@ -57,7 +57,7 @@ public class AMQ2021Test extends TestCas
     AMQ2021Test testCase;
     
     String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";    
-    String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1";
+    String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
     
     private int numMessages = 1000;
     private int numConsumers = 2;



Mime
View raw message