activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1078189 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/RedeliveryPolicy.java test/java/org/apache/activemq/RedeliveryPolicyTest.java
Date Fri, 04 Mar 2011 22:39:12 GMT
Author: tabish
Date: Fri Mar  4 22:39:11 2011
New Revision: 1078189

URL: http://svn.apache.org/viewvc?rev=1078189&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-3045

Adds an optional maximumRedeliveryDelay that defaults to -1 to indicate no max so behavior
is unchanged from previous versions.  When the backoff multiplier is enabled and the maximumReliveryDelay
is set > -1 then the delay returned by getNextReliveryDealy is capped by the max of either
maximumReliveryDelay or redeliveryDelay preventing delays smaller than the configured redelivery
delay.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=1078189&r1=1078188&r2=1078189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Fri
Mar  4 22:39:11 2011
@@ -22,9 +22,9 @@ import java.util.Random;
 /**
  * Configuration options used to control how messages are re-delivered when they
  * are rolled back.
- * 
+ *
  * @org.apache.xbean.XBean element="redeliveryPolicy"
- * 
+ *
  */
 public class RedeliveryPolicy implements Cloneable, Serializable {
 
@@ -34,6 +34,7 @@ public class RedeliveryPolicy implements
     // +/-15% for a 30% spread -cgs
     private double collisionAvoidanceFactor = 0.15d;
     private int maximumRedeliveries = 6;
+    private long maximumRedeliveryDelay = -1;
     private long initialRedeliveryDelay = 1000L;
     private boolean useCollisionAvoidance;
     private boolean useExponentialBackOff;
@@ -75,6 +76,14 @@ public class RedeliveryPolicy implements
         this.initialRedeliveryDelay = initialRedeliveryDelay;
     }
 
+    public long getMaximumRedeliveryDelay() {
+        return maximumRedeliveryDelay;
+    }
+
+    public void setMaximumRedeliveryDelay(long maximumRedeliveryDelay) {
+        this.maximumRedeliveryDelay = maximumRedeliveryDelay;
+    }
+
     public int getMaximumRedeliveries() {
         return maximumRedeliveries;
     }
@@ -90,6 +99,10 @@ public class RedeliveryPolicy implements
             nextDelay = redeliveryDelay;
         } else if (useExponentialBackOff && backOffMultiplier > 1) {
             nextDelay = (long) (previousDelay * backOffMultiplier);
+            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay)
{
+                // in case the user made max redelivery delay less than redelivery delay
for some reason.
+                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
+            }
         } else {
             nextDelay = previousDelay;
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?rev=1078189&r1=1078188&r2=1078189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
Fri Mar  4 22:39:11 2011
@@ -27,8 +27,8 @@ import org.apache.activemq.command.Activ
 
 /**
  * Test cases used to test the JMS message exclusive consumers.
- * 
- * 
+ *
+ *
  */
 public class RedeliveryPolicyTest extends JmsTestSupport {
 
@@ -48,42 +48,42 @@ public class RedeliveryPolicyTest extend
         // Receive a message with the JMS API
         RedeliveryPolicy policy = connection.getRedeliveryPolicy();
         policy.setInitialRedeliveryDelay(0);
-        policy.setRedeliveryDelay(500);              
+        policy.setRedeliveryDelay(500);
         policy.setBackOffMultiplier((short) 2);
         policy.setUseExponentialBackOff(true);
-        
+
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue(getName());
         MessageProducer producer = session.createProducer(destination);
-        
+
         MessageConsumer consumer = session.createConsumer(destination);
 
         // Send the messages
         producer.send(session.createTextMessage("1st"));
         producer.send(session.createTextMessage("2nd"));
         session.commit();
-        
+
         TextMessage m;
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
 
         // No delay on first rollback..
         m = (TextMessage)consumer.receive(100);
         assertNotNull(m);
         session.rollback();
-        
+
         // Show subsequent re-delivery delay is incrementing.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
-        
+
         m = (TextMessage)consumer.receive(700);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
-        
+
         // Show re-delivery delay is incrementing exponentially
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
@@ -91,8 +91,8 @@ public class RedeliveryPolicyTest extend
         assertNull(m);
         m = (TextMessage)consumer.receive(700);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        
+        assertEquals("1st", m.getText());
+
     }
 
 
@@ -110,41 +110,41 @@ public class RedeliveryPolicyTest extend
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue(getName());
         MessageProducer producer = session.createProducer(destination);
-        
+
         MessageConsumer consumer = session.createConsumer(destination);
 
         // Send the messages
         producer.send(session.createTextMessage("1st"));
         producer.send(session.createTextMessage("2nd"));
         session.commit();
-        
+
         TextMessage m;
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
 
         // No delay on first rollback..
         m = (TextMessage)consumer.receive(100);
         assertNotNull(m);
         session.rollback();
-        
+
         // Show subsequent re-delivery delay is incrementing.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
         m = (TextMessage)consumer.receive(700);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
-        
+
         // The message gets redelivered after 500 ms every time since
         // we are not using exponential backoff.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
         m = (TextMessage)consumer.receive(700);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        
+        assertEquals("1st", m.getText());
+
     }
 
     /**
@@ -157,12 +157,12 @@ public class RedeliveryPolicyTest extend
         policy.setInitialRedeliveryDelay(100);
         policy.setUseExponentialBackOff(false);
         policy.setMaximumRedeliveries(2);
-        
+
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue("TEST");
         MessageProducer producer = session.createProducer(destination);
-        
+
         MessageConsumer consumer = session.createConsumer(destination);
         MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
 
@@ -170,38 +170,38 @@ public class RedeliveryPolicyTest extend
         producer.send(session.createTextMessage("1st"));
         producer.send(session.createTextMessage("2nd"));
         session.commit();
-        
+
         TextMessage m;
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
 
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
 
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
-        
-        // The last rollback should cause the 1st message to get sent to the DLQ 
+
+        // The last rollback should cause the 1st message to get sent to the DLQ
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("2nd", m.getText());        
+        assertEquals("2nd", m.getText());
         session.commit();
-        
+
         // We should be able to get the message off the DLQ now.
         m = (TextMessage)dlqConsumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.commit();
-        
+
     }
-    
-    
+
+
     /**
      * @throws Exception
      */
@@ -213,60 +213,110 @@ public class RedeliveryPolicyTest extend
         policy.setUseExponentialBackOff(false);
        //  let's set the maximum redeliveries to no maximum (ie. infinite)
         policy.setMaximumRedeliveries(-1);
-        
-        
+
+
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue("TEST");
         MessageProducer producer = session.createProducer(destination);
-        
+
         MessageConsumer consumer = session.createConsumer(destination);
-        
+
         // Send the messages
         producer.send(session.createTextMessage("1st"));
         producer.send(session.createTextMessage("2nd"));
         session.commit();
-           
+
         TextMessage m;
- 
+
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
-        
+
         //we should be able to get the 1st message redelivered until a session.commit is
called
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        session.rollback();           
-        
+        assertEquals("1st", m.getText());
+        session.rollback();
+
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        session.rollback();  
-        
+        assertEquals("1st", m.getText());
+        session.rollback();
+
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        session.rollback();  
-        
+        assertEquals("1st", m.getText());
+        session.rollback();
+
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        session.rollback();  
-        
+        assertEquals("1st", m.getText());
+        session.rollback();
+
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
-        session.commit();  
-        
+        assertEquals("1st", m.getText());
+        session.commit();
+
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("2nd", m.getText());        
-        session.commit();  
-       
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testMaximumRedeliveryDelay() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(10);
+        policy.setUseExponentialBackOff(true);
+        policy.setMaximumRedeliveries(-1);
+        policy.setRedeliveryDelay(50);
+        policy.setMaximumRedeliveryDelay(1000);
+        policy.setBackOffMultiplier((short) 2);
+        policy.setUseExponentialBackOff(true);
+
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+
+        TextMessage m;
+
+        for(int i = 0; i < 10; ++i) {
+            // we should be able to get the 1st message redelivered until a session.commit
is called
+            m = (TextMessage)consumer.receive(2000);
+            assertNotNull(m);
+            assertEquals("1st", m.getText());
+            session.rollback();
+        }
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());
+        session.commit();
+
+        m = (TextMessage)consumer.receive(2000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+        assertTrue(policy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000 );
     }
-    
+
     /**
      * @throws Exception
      */
@@ -278,33 +328,33 @@ public class RedeliveryPolicyTest extend
         policy.setUseExponentialBackOff(false);
         //let's set the maximum redeliveries to 0
         policy.setMaximumRedeliveries(0);
-      
+
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         ActiveMQQueue destination = new ActiveMQQueue("TEST");
         MessageProducer producer = session.createProducer(destination);
-        
+
         MessageConsumer consumer = session.createConsumer(destination);
-        
+
         // Send the messages
         producer.send(session.createTextMessage("1st"));
         producer.send(session.createTextMessage("2nd"));
         session.commit();
-        
+
         TextMessage m;
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("1st", m.getText());        
+        assertEquals("1st", m.getText());
         session.rollback();
-        
+
         //the 1st  message should not be redelivered since maximumRedeliveries is set to
0
         m = (TextMessage)consumer.receive(1000);
         assertNotNull(m);
-        assertEquals("2nd", m.getText());        
-        session.commit();        
-     
-  
-       
+        assertEquals("2nd", m.getText());
+        session.commit();
+
+
+
     }
 
 



Mime
View raw message