activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1310410 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Date Fri, 06 Apr 2012 15:50:03 GMT
Author: gtully
Date: Fri Apr  6 15:50:03 2012
New Revision: 1310410

URL: http://svn.apache.org/viewvc?rev=1310410&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3775 - ensure no concurrent kahadb transaction in
topic store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1310410&r1=1310409&r2=1310410&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Apr  6 15:50:03 2012
@@ -456,11 +456,11 @@ public class KahaDBStore extends Message
             // operations... but for now we must
             // externally synchronize...
             Location location;
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 location = findMessageLocation(key, dest);
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
             if (location == null) {
                 return null;
@@ -472,7 +472,7 @@ public class KahaDBStore extends Message
         public int getMessageCount() throws IOException {
             try {
                 lockAsyncJobQueue();
-                indexLock.readLock().lock();
+                indexLock.writeLock().lock();
                 try {
                     return pageFile.tx().execute(new Transaction.CallableClosure<Integer,
IOException>() {
                         public Integer execute(Transaction tx) throws IOException {
@@ -490,7 +490,7 @@ public class KahaDBStore extends Message
                         }
                     });
                 }finally {
-                    indexLock.readLock().unlock();
+                    indexLock.writeLock().unlock();
                 }
             } finally {
                 unlockAsyncJobQueue();
@@ -499,7 +499,7 @@ public class KahaDBStore extends Message
 
         @Override
         public boolean isEmpty() throws IOException {
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean,
IOException>() {
                     public Boolean execute(Transaction tx) throws IOException {
@@ -510,7 +510,7 @@ public class KahaDBStore extends Message
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
         }
 
@@ -540,7 +540,7 @@ public class KahaDBStore extends Message
 
 
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener
listener) throws Exception {
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
@@ -564,12 +564,13 @@ public class KahaDBStore extends Message
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
         }
 
         public void resetBatching() {
             if (pageFile.isLoaded()) {
+                indexLock.writeLock().lock();
                 try {
                     pageFile.tx().execute(new Transaction.Closure<Exception>() {
                         public void execute(Transaction tx) throws Exception {
@@ -580,6 +581,8 @@ public class KahaDBStore extends Message
                         });
                 } catch (Exception e) {
                     LOG.error("Failed to reset batching",e);
+                }finally {
+                    indexLock.writeLock().unlock();
                 }
             }
         }
@@ -736,7 +739,7 @@ public class KahaDBStore extends Message
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {
 
             final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
@@ -752,7 +755,7 @@ public class KahaDBStore extends Message
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
 
             SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
@@ -762,7 +765,7 @@ public class KahaDBStore extends Message
 
         public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName)
throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo,
IOException>() {
                     public SubscriptionInfo execute(Transaction tx) throws IOException {
@@ -776,7 +779,7 @@ public class KahaDBStore extends Message
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
         }
 
@@ -933,7 +936,7 @@ public class KahaDBStore extends Message
     public Set<ActiveMQDestination> getDestinations() {
         try {
             final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
@@ -960,7 +963,7 @@ public class KahaDBStore extends Message
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
             return rc;
         } catch (IOException e) {



Mime
View raw message