activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r696370 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/PrefetchSubscription.java transaction/Transaction.java
Date Wed, 17 Sep 2008 16:59:43 GMT
Author: chirino
Date: Wed Sep 17 09:59:42 2008
New Revision: 696370

URL: http://svn.apache.org/viewvc?rev=696370&view=rev
Log:
This fixes the recent errors the test cases have been seeing with transacted acks due to the
new ack assertion bits added.
We now take the mesage out of the dispatch list when the ack is received, but we put it back
on a rollback.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=696370&r1=696369&r2=696370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Sep 17 09:59:42 2008
@@ -197,6 +197,7 @@
                     }
                     if (inAckRange) {
                         // Don't remove the nodes until we are committed.
+                        removeList.add(node);
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
                             if (!this.getConsumerInfo().isBrowser()) {
@@ -205,7 +206,6 @@
                             if (!isSlave()) {
                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                             }
-                            removeList.add(node);
                         } else {
                             // setup a Synchronization to remove nodes from the
                             // dispatched list.
@@ -215,9 +215,7 @@
                                         public void afterCommit()
                                                 throws Exception {
                                             synchronized(dispatchLock) {
-                                            
                                                 dequeueCounter++;
-                                                dispatched.remove(node);
                                                 node
                                                         .getRegionDestination()
                                                         .getDestinationStatistics()
@@ -234,9 +232,11 @@
                                             }
                                         }
 
-                                        public void afterRollback()
-                                                throws Exception {
-                                            super.afterRollback();
+                                        public void afterRollback() throws Exception {
+                                        	// Need to put it back in the front.
+                                            synchronized(dispatchLock) {
+                                        	    dispatched.add(0, node);
+                                            }
                                         }
                                     });
                         }
@@ -426,12 +426,16 @@
 		boolean checkFoundStart = false;
 		boolean checkFoundEnd = false;
 		for (MessageReference node : dispatched) {
-			if (!checkFoundStart && firstAckedMsg != null && firstAckedMsg.equals(node.getMessageId()))
{
+			
+			if( firstAckedMsg == null ) {
+				checkFoundStart=true;
+			} else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
 				checkFoundStart = true;
 			}
 
-			if (checkFoundStart || firstAckedMsg == null)
+			if (checkFoundStart) {
 				checkCount++;
+			}
 
 			if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
 				checkFoundEnd = true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=696370&r1=696369&r2=696370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
Wed Sep 17 09:59:42 2008
@@ -18,6 +18,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 
 import javax.transaction.xa.XAException;
@@ -88,6 +89,7 @@
     }
 
     public void fireAfterRollback() throws Exception {
+    	Collections.reverse(synchronizations);
         for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();)
{
             Synchronization s = iter.next();
             s.afterRollback();



Mime
View raw message