activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1039392 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/TopicSubscription.java test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
Date Fri, 26 Nov 2010 14:45:17 GMT
Author: gtully
Date: Fri Nov 26 14:45:17 2010
New Revision: 1039392

URL: http://svn.apache.org/viewvc?rev=1039392&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-3056 - do not throw on redelivery ack
for a topic

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1039392&r1=1039391&r2=1039392&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Nov 26 14:45:17 2010
@@ -282,6 +282,9 @@ public class TopicSubscription extends A
             dequeueCounter.addAndGet(ack.getMessageCount());
             dispatchMatched();
             return;
+        } else if (ack.isRedeliveredAck()) {
+            // nothing to do atm
+            return;
         }
         throw new JMSException("Invalid acknowledgment: " + ack);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java?rev=1039392&r1=1039391&r2=1039392&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
Fri Nov 26 14:45:17 2010
@@ -16,9 +16,12 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -28,12 +31,15 @@ import javax.jms.Topic;
 
 import org.apache.activemq.test.TestSupport;
 import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.1.1.1 $
  */
 public class TopicRedeliverTest extends TestSupport {
 
+    private static final Log LOG = LogFactory.getLog(TopicRedeliverTest.class);
     private static final int RECEIVE_TIMEOUT = 10000;
 
     protected int deliveryMode = DeliveryMode.PERSISTENT;
@@ -141,6 +147,51 @@ public class TopicRedeliverTest extends 
         connection.close();
     }
 
+    public void testNoExceptionOnRedeliveryAckWithSimpleTopicConsumer() throws Exception
{
+        Destination destination = createDestination(getClass().getName());
+        Connection connection = createConnection();
+        final AtomicBoolean gotException = new AtomicBoolean();
+        connection.setExceptionListener(new ExceptionListener() {
+            public void onException(JMSException exception) {
+                LOG.error("unexpected ex:" + exception);
+                    gotException.set(true);
+            }
+        });
+        connection.setClientID(idGen.generateId());
+        connection.start();
+        Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = null;
+        if (topic) {
+            consumer = consumerSession.createConsumer((Topic)destination);
+        } else {
+            consumer = consumerSession.createConsumer(destination);
+        }
+        Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        TextMessage sentMsg = producerSession.createTextMessage();
+        sentMsg.setText("msg1");
+        producer.send(sentMsg);
+        producerSession.commit();
+
+        Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
+        assertFalse(recMsg.getJMSRedelivered());
+        recMsg = consumer.receive(RECEIVE_TIMEOUT);
+        consumerSession.rollback();
+        recMsg = consumer.receive(RECEIVE_TIMEOUT);
+        assertTrue(recMsg.getJMSRedelivered());
+        consumerSession.rollback();
+        recMsg = consumer.receive(RECEIVE_TIMEOUT);
+        assertTrue(recMsg.getJMSRedelivered());
+        consumerSession.commit();
+        assertTrue(recMsg.equals(sentMsg));
+        assertTrue(recMsg.getJMSRedelivered());
+        connection.close();
+
+        assertFalse("no exception", gotException.get());
+    }
+
     /**
      * Check a session is rollbacked on a Session close();
      * 



Mime
View raw message