activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r397915 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java main/java/org/apache/activemq/ActiveMQSessionExecutor.java test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Date Fri, 28 Apr 2006 15:11:43 GMT
Author: chirino
Date: Fri Apr 28 08:11:39 2006
New Revision: 397915

URL: http://svn.apache.org/viewcvs?rev=397915&view=rev
Log:
Applyed Rodrigo S de Castro's latest unit test patch and fixed the redelivery problem.  Redelivery
was not being delayed when rollback was called from the message listener.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=397915&r1=397914&r2=397915&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Apr 28 08:11:39 2006
@@ -788,7 +788,7 @@
         MessageListener listener = this.messageListener;
         try {
             if (!unconsumedMessages.isClosed()) {
-                if (listener != null && started.get()) {
+                if (listener != null && unconsumedMessages.isRunning() ) {
                     ActiveMQMessage message = createActiveMQMessage(md);
                     beforeMessageIsConsumed(md);
                     try {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=397915&r1=397914&r2=397915&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Fri Apr 28 08:11:39 2006
@@ -147,7 +147,7 @@
             return false;
         } else {
             dispatch(message);
-            return true;
+            return messageQueue.isRunning();
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?rev=397915&r1=397914&r2=397915&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Fri Apr 28 08:11:39 2006
@@ -35,7 +35,8 @@
 
 public class MessageListenerRedeliveryTest extends TestCase {
 
-    private static final Log log = LogFactory.getLog(MessageListenerRedeliveryTest.class);
+    private Log log = LogFactory.getLog(getClass());
+
     private Connection connection;
 
     protected void setUp() throws Exception {
@@ -55,8 +56,8 @@
     protected RedeliveryPolicy getRedeliveryPolicy() {
         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         redeliveryPolicy.setInitialRedeliveryDelay(1000);
-        redeliveryPolicy.setBackOffMultiplier((short) 5);
-        redeliveryPolicy.setMaximumRedeliveries(10);
+        redeliveryPolicy.setMaximumRedeliveries(2);
+        redeliveryPolicy.setBackOffMultiplier((short) 2);
         redeliveryPolicy.setUseExponentialBackOff(true);
         return redeliveryPolicy;
     }
@@ -67,59 +68,99 @@
         return factory.createConnection();
     }
 
-    private class ConsumerMessageListenerTest implements MessageListener {
-        private ActiveMQMessageConsumer consumer;
+    private class TestMessageListener implements MessageListener {
+        private Session session;
+
         public int counter = 0;
 
-        public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
-            this.consumer = consumer;
+        public TestMessageListener(Session session) {
+            this.session = session;
         }
 
         public void onMessage(Message message) {
             try {
-                log.info("Message: " + message);
+                log.info("Message Received: " + message);
                 counter++;
-                if (counter <= 2) {
-                    log.info("ROLLBACK");
-                    consumer.rollback();
+                if (counter <= 3) {
+                    log.info("Message Rollback.");
+                    session.rollback();
                 } else {
-                    log.info("COMMIT");
+                    log.info("Message Commit.");
                     message.acknowledge();
-                    consumer.commit();
+                    session.commit();
                 }
             } catch (JMSException e) {
-                System.err.println("Error when rolling back transaction");
+                log.error("Error when rolling back transaction");
             }
         }
     }
 
-    private class SessionMessageListenerTest implements MessageListener {
-        private Session session;
-        public int counter = 0;
+    public void testQueueRollbackConsumerListener() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session);
+        producer.send(message);
+        session.commit();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
+        mc.setRedeliveryPolicy(getRedeliveryPolicy());
+
+        TestMessageListener listener = new TestMessageListener(session);
+        consumer.setMessageListener(listener);
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
 
-        public SessionMessageListenerTest(Session session) {
-            this.session = session;
         }
+        // first try
+        assertEquals(1, listener.counter);
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
 
-        public void onMessage(Message message) {
-            try {
-                log.info("Message: " + message);
-                counter++;
-                if (counter < 2) {
-                    log.info("ROLLBACK");
-                    session.rollback();
-                } else {
-                    log.info("COMMIT");
-                    message.acknowledge();
-                    session.commit();
-                }
-            } catch (JMSException e) {
-                System.err.println("Error when rolling back transaction");
-            }
         }
+        // second try (redelivery after 1 sec)
+        assertEquals(2, listener.counter);
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+
+        }
+        // third try (redelivery after 2 seconds) - it should give up after that
+        assertEquals(3, listener.counter);
+
+        // create new message
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // it should be committed, so no redelivery
+        assertEquals(4, listener.counter);
+
+        try {
+            Thread.sleep(1500);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // no redelivery, counter should still be 4
+        assertEquals(4, listener.counter);
+
+        session.close();
     }
 
-    public void testQueueRollbackMessageListener() throws JMSException {
+    public void testQueueRollbackSessionListener() throws JMSException {
         connection.start();
 
         Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -134,25 +175,52 @@
         ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
         mc.setRedeliveryPolicy(getRedeliveryPolicy());
 
-        SessionMessageListenerTest listener = new SessionMessageListenerTest(session);
+        TestMessageListener listener = new TestMessageListener(session);
         consumer.setMessageListener(listener);
 
         try {
-            Thread.sleep(7000);
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+
+        }
+        // first try
+        assertEquals(1, listener.counter);
+
+        try {
+            Thread.sleep(1000);
         } catch (InterruptedException e) {
 
         }
+        // second try (redelivery after 1 sec)
         assertEquals(2, listener.counter);
 
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+
+        }
+        // third try (redelivery after 2 seconds) - it should give up after that
+        assertEquals(3, listener.counter);
+
+        // create new message
         producer.send(createTextMessage(session));
         session.commit();
 
         try {
-            Thread.sleep(2000);
+            Thread.sleep(500);
         } catch (InterruptedException e) {
             // ignore
         }
-        assertEquals(3, listener.counter);
+        // it should be committed, so no redelivery
+        assertEquals(4, listener.counter);
+
+        try {
+            Thread.sleep(1500);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // no redelivery, counter should still be 4
+        assertEquals(4, listener.counter);
 
         session.close();
     }



Mime
View raw message