activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r985155 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ main/java/org/apache/activemq/store/memory/ test/java/org/apache/activemq/broker/region/cursors/
Date Fri, 13 Aug 2010 11:02:35 GMT
Author: gtully
Date: Fri Aug 13 11:02:34 2010
New Revision: 985155

URL: http://svn.apache.org/viewvc?rev=985155&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2868 - make it bullit proof with a transaction
completion sync that serializes updates to a cusror and store so that they are always in order

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=985155&r1=985154&r2=985155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Fri Aug 13 11:02:34 2010
@@ -259,7 +259,10 @@ public class KahaDBTransactionStore impl
 
             } else {
                 KahaTransactionInfo info = getTransactionInfo(txid);
-                theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit,
postCommit);
+                // ensure message order w.r.t to cursor and store for setBatch()
+                synchronized (this) {
+                    theStore.store(new KahaCommitCommand().setTransactionInfo(info), true,
preCommit, postCommit);
+                }
             }
         }else {
            LOG.error("Null transaction passed on commit");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=985155&r1=985154&r2=985155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Fri Aug 13 11:02:34 2010
@@ -217,11 +217,13 @@ public class MemoryTransactionStore impl
             }
             return;
         }
-        tx.commit();
-        if (postCommit != null) {
-            postCommit.run();
+        // ensure message order w.r.t to cursor and store for setBatch()
+        synchronized (this) {
+            tx.commit();
+            if (postCommit != null) {
+                postCommit.run();
+            }
         }
-
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=985155&r1=985154&r2=985155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Fri Aug 13 11:02:34 2010
@@ -41,6 +41,7 @@ import javax.management.ObjectName;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -77,7 +78,7 @@ import org.apache.commons.logging.LogFac
  * 2) transacted
  * 
  */
-public class NegativeQueueTest extends TestCase {
+public class NegativeQueueTest extends AutoFailTestSupport {
     private static final Log LOG = LogFactory.getLog(NegativeQueueTest.class);
     
     public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
@@ -92,7 +93,7 @@ public class NegativeQueueTest extends T
     private static final int MESSAGE_COUNT = 2000;  
     
     protected static final boolean TRANSACTED = true;
-    protected static final boolean DEBUG = false;
+    protected static final boolean DEBUG = true;
     protected static int NUM_CONSUMERS = 20;    
     protected static int PREFETCH_SIZE = 1000;  
     
@@ -211,7 +212,25 @@ public class NegativeQueueTest extends T
             consumer.setMessageListener(new SessionAwareMessageListener(consumerSession,
latch2, consumerList2));
         }
         
-        assertTrue("got all expected messages on 2", latch2.await(300000, TimeUnit.MILLISECONDS));
+        boolean success = Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                boolean done = latch2.await(10, TimeUnit.SECONDS);
+                if(DEBUG){
+                    System.out.println("");
+                    System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize());
+                    System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage());
+                    System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
+                    System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage());
+                    System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit());
+                }
+                return done;
+            }
+        }, 300 * 1000);
+        if (!success) {
+            dumpAllThreads("blocked waiting on 2");
+        }
+        assertTrue("got all expected messages on 2", success);
+
         producerConnection.close();
         for(int ix=0; ix<NUM_CONSUMERS; ix++){
             consumerConnections1[ix].close();



Mime
View raw message