activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r788646 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/amq/AMQTransactionStore.java test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
Date Fri, 26 Jun 2009 10:09:24 GMT
Author: gtully
Date: Fri Jun 26 10:09:24 2009
New Revision: 788646

URL: http://svn.apache.org/viewvc?rev=788646&view=rev
Log:
fix sync of getTx in transaction Store, could result in extra uncommited tx hanging about
under high load

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?rev=788646&r1=788645&r2=788646&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
Fri Jun 26 10:09:24 2009
@@ -18,12 +18,11 @@
 package org.apache.activemq.store.amq;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
+
+import javax.transaction.xa.XAException;
 
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
@@ -39,7 +38,7 @@
  */
 public class AMQTransactionStore implements TransactionStore {
 
-    Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId,
AMQTx>();
+    protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId,
AMQTx>();
     Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId,
AMQTx>();
 
     private final AMQPersistenceAdapter peristenceAdapter;
@@ -88,10 +87,10 @@
         AMQTx tx = null;
         synchronized (inflightTransactions) {
             tx = inflightTransactions.get(txid);
-        }
-        if (tx == null) {
-            tx = new AMQTx(location);
-            inflightTransactions.put(txid, tx);
+            if (tx == null) {
+                tx = new AMQTx(location);
+                inflightTransactions.put(txid, tx);
+            }
         }
         return tx;
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java?rev=788646&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
Fri Jun 26 10:09:24 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.amq.AMQTransactionStore;
+import org.apache.activemq.store.amq.AMQTx;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ * 
+ * @version $Revision$
+ */
+public class TransactionStoreTest extends TestCase {
+
+    protected static final int MAX_TX = 2500;
+    protected static final int MAX_THREADS = 200;
+    
+    class UnderTest extends AMQTransactionStore {
+        public UnderTest() {
+            super(null);
+        }
+        public Map<TransactionId, AMQTx>  getInFlight() {
+         return inflightTransactions;   
+        }
+    };
+    
+    UnderTest underTest = new UnderTest();
+    
+  public void testConcurrentGetTx() throws Exception {
+      final ConnectionId connectionId = new ConnectionId("1:1");
+      
+      Runnable getTx = new Runnable() {
+          
+        public void run() {
+            for (int i=0; i<MAX_TX;i++) {
+                TransactionId txid = new LocalTransactionId(connectionId, i);
+                underTest.getTx(txid, null);
+            }
+        }
+      };
+      
+      ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
+      for (int i=0;i < MAX_THREADS; i++) {
+          executor.execute(getTx);
+      }
+      executor.shutdown();
+      executor.awaitTermination(10, TimeUnit.SECONDS);
+      
+      assertEquals("has just the right amount of transactions", MAX_TX, underTest.getInFlight().size());
+  }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/TransactionStoreTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message