activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r988455 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/QueueSubscription.java test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
Date Tue, 24 Aug 2010 09:50:25 GMT
Author: gtully
Date: Tue Aug 24 09:50:25 2010
New Revision: 988455

URL: http://svn.apache.org/viewvc?rev=988455&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2876 - ensure an acked message is not
expired when processing the ack

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=988455&r1=988454&r2=988455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Tue Aug 24 09:50:25 2010
@@ -50,12 +50,11 @@ public class QueueSubscription extends P
         final Queue queue = (Queue)q;
         
         if (n.isExpired()) {
-            if (broker.isExpired(n)) {
-                queue.messageExpired(context, this, node);
-            } else {
-                LOG.debug("ignoring ack " + ack + ", for already expired message: " + n);
+            // sync with message expiry processing
+            if (!broker.isExpired(n)) {
+                LOG.warn("ignoring ack " + ack + ", for already expired message: " + n);
+                return;
             }
-            return;
         }
         queue.removeMessage(context, this, node, ack);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java?rev=988455&r1=988454&r2=988455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
Tue Aug 24 09:50:25 2010
@@ -17,6 +17,8 @@
 package org.apache.activemq;
 
 import java.util.Date;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -107,6 +109,58 @@ public class JmsSendReceiveWithMessageEx
         assertNull(consumer.receive(1000));
     }
 
+     public void testConsumeExpiredQueueAndDlq() throws Exception {
+
+         MessageProducer producerNormal = createProducer(0);
+         MessageProducer producerExpire = createProducer(500);
+
+         consumerDestination = session.createQueue("ActiveMQ.DLQ");
+         MessageConsumer dlqConsumer = createConsumer();
+
+         consumerDestination = session.createQueue(getConsumerSubject());
+         producerDestination = session.createQueue(getProducerSubject());
+
+
+         Connection consumerConnection = createConnection();
+         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+         prefetchPolicy.setAll(10);
+         ((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy);
+         Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
+         consumerConnection.start();
+         connection.start();
+
+         String msgBody = new String(new byte[20*1024]);
+         for (int i = 0; i < data.length; i++) {
+             Message message = session.createTextMessage(msgBody);
+             producerExpire.send(producerDestination, message);
+         }
+
+         for (int i = 0; i < data.length; i++) {
+             Message message = session.createTextMessage(msgBody);
+             producerNormal.send(producerDestination, message);
+         }
+
+         Vector<Message> messages = new Vector<Message>();
+         Message received;
+         while ((received = consumer.receive(1000)) != null) {
+             messages.add(received);
+             if (messages.size() == 1) {
+                TimeUnit.SECONDS.sleep(1);
+             }
+             received.acknowledge();
+         };
+
+         assertEquals("got messages", messageCount + 1, messages.size());
+
+         Vector<Message> dlqMessages = new Vector<Message>();
+         while ((received = dlqConsumer.receive(1000)) != null) {
+             dlqMessages.add(received);
+         };
+
+         assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+    }
+    
     /**
      * Sends and consumes the messages to a queue destination.
      * 



Mime
View raw message