activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r818487 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Date Thu, 24 Sep 2009 14:01:54 GMT
Author: chirino
Date: Thu Sep 24 14:01:53 2009
New Revision: 818487

URL: http://svn.apache.org/viewvc?rev=818487&view=rev
Log:
AMQ-2401: Applying patch which makes the DUPS_OK on Queue case use the same ack strategy that
is used by AUTO_ACK.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=818487&r1=818486&r2=818487&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Sep 24 14:01:53 2009
@@ -238,6 +238,14 @@
         }
     }
 
+    private boolean isAutoAcknowledgeEach() {
+        return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() &&
getDestination().isQueue() );
+    }
+
+    private boolean isAutoAcknowledgeBatch() {
+        return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
+    }
+
     public StatsImpl getStats() {
         return stats;
     }
@@ -642,7 +650,7 @@
     void deliverAcks() {
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
-            if (session.isAutoAcknowledge()) {
+            if (isAutoAcknowledgeEach()) {
                 synchronized(deliveredMessages) {
                     ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                     if (ack != null) {
@@ -687,7 +695,7 @@
             // Ack any delivered messages now.
             if (!session.getTransacted()) { 
                 deliverAcks();
-                if (session.isDupsOkAcknowledge()) {
+                if (isAutoAcknowledgeBatch()) {
                     acknowledge();
                 }
             }
@@ -771,7 +779,7 @@
     private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
         md.setDeliverySequenceId(session.getNextDeliveryId());
         lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
-        if (!session.isDupsOkAcknowledge()) {
+        if (!isAutoAcknowledgeBatch()) {
             synchronized(deliveredMessages) {
                 deliveredMessages.addFirst(md);
             }
@@ -795,7 +803,7 @@
             stats.onMessage();
             if (session.getTransacted()) {
                 // Do nothing.
-            } else if (session.isAutoAcknowledge()) {
+            } else if (isAutoAcknowledgeEach()) {
                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                     synchronized (deliveredMessages) {
                         if (!deliveredMessages.isEmpty()) {
@@ -820,7 +828,7 @@
                     }
                     deliveryingAcknowledgements.set(false);
                 }
-            } else if (session.isDupsOkAcknowledge()) {
+            } else if (isAutoAcknowledgeBatch()) {
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
             } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge())
{
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
@@ -1081,7 +1089,7 @@
                                 }
                                 afterMessageIsConsumed(md, expired);
                             } catch (RuntimeException e) {
-                                if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
|| session.isIndividualAcknowledge()) {
+                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() ||
session.isIndividualAcknowledge()) {
                                     // Redeliver the message
                                 } else {
                                     // Transacted or Client ack: Deliver the



Mime
View raw message