activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1300723 - /activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
Date Wed, 14 Mar 2012 21:15:20 GMT
Author: tabish
Date: Wed Mar 14 21:15:20 2012
New Revision: 1300723

URL: http://svn.apache.org/viewvc?rev=1300723&view=rev
Log:
fixes for: https://issues.apache.org/jira/browse/AMQ-3768

Prevent usage of stale cache data.

Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1300723&r1=1300722&r2=1300723&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java Wed Mar 14
21:15:20 2012
@@ -17,18 +17,19 @@
 package org.apache.kahadb.index;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.kahadb.index.ListNode.ListIterator;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.Marshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ListIndex<Key,Value> implements Index<Key,Value> {
 
@@ -120,6 +121,7 @@ public class ListIndex<Key,Value> implem
 
     private ListNode<Key, Value> lastGetNodeCache = null;
     private Map.Entry<Key, Value> lastGetEntryCache = null;
+    private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     synchronized public Value get(Transaction tx, Key key) throws IOException {
@@ -129,6 +131,7 @@ public class ListIndex<Key,Value> implem
             if (key.equals(candidate.getKey())) {
                 this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
                 this.lastGetEntryCache = candidate;
+                this.lastCacheTxSrc = new WeakReference<Transaction>(tx);
                 return candidate.getValue();
             }
         }
@@ -146,7 +149,7 @@ public class ListIndex<Key,Value> implem
 
         Value oldValue = null;
 
-        if (lastGetNodeCache != null) {
+        if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
 
             if(lastGetEntryCache.getKey().equals(key)) {
                 oldValue = lastGetEntryCache.setValue(value);
@@ -166,6 +169,8 @@ public class ListIndex<Key,Value> implem
                     return oldValue;
                 }
             }
+        } else {
+            flushCache();
         }
 
         // Not found because the cache wasn't set or its not at the end of the list so we
@@ -207,7 +212,7 @@ public class ListIndex<Key,Value> implem
             return null;
         }
 
-        if (lastGetNodeCache != null) {
+        if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
 
             // This searches from the last location of a call to get for the element to remove
             // all the way to the end of the ListIndex.
@@ -216,9 +221,12 @@ public class ListIndex<Key,Value> implem
                 Map.Entry<Key, Value> entry = iterator.next();
                 if (entry.getKey().equals(key)) {
                     iterator.remove();
+                    flushCache();
                     return entry.getValue();
                 }
             }
+        } else {
+            flushCache();
         }
 
         // Not found because the cache wasn't set or its not at the end of the list so we
@@ -229,6 +237,7 @@ public class ListIndex<Key,Value> implem
             Map.Entry<Key, Value> entry = iterator.next();
             if (entry.getKey().equals(key)) {
                 iterator.remove();
+                flushCache();
                 return entry.getValue();
             }
         }
@@ -238,6 +247,7 @@ public class ListIndex<Key,Value> implem
 
     public void onRemove() {
         size.decrementAndGet();
+        flushCache();
     }
 
     public boolean isTransient() {
@@ -251,6 +261,7 @@ public class ListIndex<Key,Value> implem
             // break up the transaction
             tx.commit();
         }
+        flushCache();
         size.set(0);
     }
 
@@ -286,10 +297,14 @@ public class ListIndex<Key,Value> implem
 
     ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
         Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
-        ListNode<Key, Value> node = page.get();
-        node.setPage(page);
-        node.setContainingList(this);
-        return node;
+        try {
+            ListNode<Key, Value> node = page.get();
+            node.setPage(page);
+            node.setContainingList(this);
+            return node;
+        } catch (ClassCastException e) {
+            throw e;
+        }
     }
 
     ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws
IOException {
@@ -354,5 +369,6 @@ public class ListIndex<Key,Value> implem
     private void flushCache() {
         this.lastGetEntryCache = null;
         this.lastGetNodeCache = null;
+        this.lastCacheTxSrc.clear();
     }
 }



Mime
View raw message