activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r916936 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Date Sat, 27 Feb 2010 10:07:05 GMT
Author: gtully
Date: Sat Feb 27 10:07:05 2010
New Revision: 916936

URL: http://svn.apache.org/viewvc?rev=916936&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-906 - have retries with autoack and
runtime exception from onMessage 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=916936&r1=916935&r2=916936&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Sat Feb 27 10:07:05 2010
@@ -650,6 +650,7 @@
     void doClose() throws JMSException {
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
+        LOG.info("remove: " + this.getConsumerId() + ", lasteDeliveredSequenceId:" + lastDeliveredSequenceId);
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         this.session.asyncSendPacket(removeCommand);
     }
@@ -1205,14 +1206,15 @@
                                 }
                                 afterMessageIsConsumed(md, expired);
                             } catch (RuntimeException e) {
+                                LOG.error(getConsumerId() + " Exception while processing
message: " + md.getMessage().getMessageId(), e);
                                 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() ||
session.isIndividualAcknowledge()) {
-                                    // Redeliver the message
+                                    // schedual redelivery and possible dlq processing
+                                    rollback();
                                 } else {
                                     // Transacted or Client ack: Deliver the
                                     // next message.
                                     afterMessageIsConsumed(md, false);
                                 }
-                                LOG.error(getConsumerId() + " Exception while processing
message: " + e, e);
                             }
                         } else {
                             unconsumedMessages.enqueue(md);
@@ -1328,14 +1330,7 @@
         if (listener != null) {
             MessageDispatch md = unconsumedMessages.dequeueNoWait();
             if (md != null) {
-                try {
-                    ActiveMQMessage message = createActiveMQMessage(md);
-                    beforeMessageIsConsumed(md);
-                    listener.onMessage(message);
-                    afterMessageIsConsumed(md, false);
-                } catch (JMSException e) {
-                    session.connection.onClientInternalException(e);
-                }
+                dispatch(md);
                 return true;
             }
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?rev=916936&r1=916935&r2=916936&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Sat Feb 27 10:07:05 2010
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq;
 
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -29,6 +34,9 @@
 import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -227,6 +235,97 @@
         session.close();
     }
 
+    public void testQueueSessionListenerExceptionRetry() throws  Exception {
+        connection.start();
+            
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session, "1");
+        producer.send(message);
+        message = createTextMessage(session, "2");
+        producer.send(message);
+            
+        
+        MessageConsumer consumer = session.createConsumer(queue);
+            
+        final CountDownLatch gotMessage = new CountDownLatch(2);
+        final AtomicInteger count  = new AtomicInteger(0);
+        final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
+        final ArrayList<String> received = new ArrayList<String>();
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                LOG.info("Message Received: " + message);
+                try {
+                    received.add(((TextMessage) message).getText());
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }
+                if (count.incrementAndGet() < maxDeliveries) {
+                    throw new RuntimeException(getName() + " force a redelivery");
+                }
+                // new blood
+                count.set(0);
+                gotMessage.countDown();
+            }
+        });
+            
+        assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
+        
+        for (int i=0; i<maxDeliveries; i++) {
+            assertEquals("got first redelivered: " + i, "1", received.get(i));
+        }
+        for (int i=maxDeliveries; i<maxDeliveries*2; i++) {
+            assertEquals("got first redelivered: " + i, "2", received.get(i));
+        }
+        session.close();
+    }
+        
+
+    public void testQueueSessionListenerExceptionDlq() throws  Exception {
+        connection.start();
+            
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session);
+        producer.send(message);
+
+        ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
+        MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
+        final CountDownLatch gotDlqMessage = new CountDownLatch(1);
+        dlqConsumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                gotDlqMessage.countDown();
+            }
+        });
+
+        MessageConsumer consumer = session.createConsumer(queue);
+            
+        final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
+        final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
+        
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                LOG.info("Message Received: " + message);
+                gotMessage.countDown();
+                throw new RuntimeException(getName() + " force a redelivery");
+            }
+        });
+            
+        assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
+        
+        // check DLQ
+        assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
+        
+        session.close();
+    }
+    
+
+    private TextMessage createTextMessage(Session session, String text) throws JMSException
{
+        return session.createTextMessage(text);
+    }
     private TextMessage createTextMessage(Session session) throws JMSException {
         return session.createTextMessage("Hello");
     }



Mime
View raw message