activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r884633 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Date Thu, 26 Nov 2009 16:46:39 GMT
Author: dejanb
Date: Thu Nov 26 16:46:38 2009
New Revision: 884633

URL: http://svn.apache.org/viewvc?rev=884633&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2515 - Optimized Acknowledgements and interrupted
transport

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=884633&r1=884632&r2=884633&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Nov 26 16:46:38 2009
@@ -134,6 +134,9 @@
     private long lastDeliveredSequenceId;
 
     private IOException failureError;
+    
+    private long optimizeAckTimestamp = System.currentTimeMillis();
+    private long optimizeAckTimeout = 300;
 
     /**
      * Create a MessageConsumer
@@ -788,7 +791,7 @@
             }
         }
     }
-
+    
     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
JMSException {
         if (unconsumedMessages.isClosed()) {
             return;
@@ -809,12 +812,13 @@
                         if (!deliveredMessages.isEmpty()) {
                             if (optimizeAcknowledge) {
                                 ackCounter++;
-                                if (ackCounter >= (info.getCurrentPrefetchSize() * .65))
{
+                                if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis()
>= (optimizeAckTimestamp + optimizeAckTimeout)) {
                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                 	if (ack != null) {
                             		    deliveredMessages.clear();
                             		    ackCounter = 0;
                             		    session.sendAck(ack);
+                            		    optimizeAckTimestamp = System.currentTimeMillis();
                                 	}
                                 }
                             } else {
@@ -1074,14 +1078,13 @@
                             session.connection.rollbackDuplicate(this, old.getMessage());
                         }
                     }
-                    if (pendingAck != null && pendingAck.isDeliveredAck()) {
-                        // on resumption a pending delivered ack will be out of sync with
-                        // re deliveries.
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("removing pending delivered ack on transport interupt:
" + pendingAck);
-                        }   
-                        pendingAck = null;
+                    if (!session.isTransacted()) {
+                        // clean, so we don't have duplicates with optimizeAcknowledge 
+                        synchronized (deliveredMessages) {
+                            deliveredMessages.clear();
+                        }
                     }
+                    pendingAck = null;
                 }
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage()))
{



Mime
View raw message