activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1153420 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/ ka...
Date Wed, 03 Aug 2011 10:18:20 GMT
Author: gtully
Date: Wed Aug  3 10:18:18 2011
New Revision: 1153420

URL: http://svn.apache.org/viewvc?rev=1153420&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3434: Contention in PLIist creation results in NPE on load - FilePendingMessageCursor. Resolve contention on creation, tidy up ListIndex iterator remove and plist release, additional test that stresses contention such that it can reproduce the stomp load test scenario

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Wed Aug  3 10:18:18 2011
@@ -133,6 +133,9 @@ public class FilePendingMessageCursor ex
     @Override
     public synchronized void release() {
         iterating = false;
+        if (iter instanceof DiskIterator) {
+           ((DiskIterator)iter).release();
+        };
         if (flushRequired) {
             flushRequired = false;
             if (!hasSpace()) {
@@ -417,7 +420,7 @@ public class FilePendingMessageCursor ex
     }
 
     protected synchronized void flushToDisk() {
-        if (!memoryList.isEmpty()) {
+        if (!memoryList.isEmpty() && store != null) {
             long start = 0;
              if (LOG.isTraceEnabled()) {
                 start = System.currentTimeMillis();
@@ -483,7 +486,7 @@ public class FilePendingMessageCursor ex
     }
 
     final class DiskIterator implements Iterator<MessageReference> {
-        private final Iterator<PListEntry> iterator;
+        private final PList.PListIterator iterator;
         DiskIterator() {
             try {
                 iterator = getDiskList().iterator();
@@ -510,5 +513,8 @@ public class FilePendingMessageCursor ex
             iterator.remove();
         }
 
+        public void release() {
+            iterator.release();
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Wed Aug  3 10:18:18 2011
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kahadb.index.ListIndex;
-import org.apache.kahadb.index.ListNode;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.ByteSequence;
@@ -58,11 +57,11 @@ public class PList extends ListIndex<Str
     }
 
     void read(DataInput in) throws IOException {
-        this.headPageId = in.readLong();
+        setHeadPageId(in.readLong());
     }
 
     public void write(DataOutput out) throws IOException {
-        out.writeLong(this.headPageId);
+        out.writeLong(getHeadPageId());
     }
 
     public synchronized void destroy() throws IOException {
@@ -185,17 +184,19 @@ public class PList extends ListIndex<Str
         return size() == 0;
     }
 
-    synchronized public Iterator<PListEntry> iterator() throws IOException {
+    public PListIterator iterator() throws IOException {
         return new PListIterator();
     }
 
-    private final class PListIterator implements Iterator<PListEntry> {
+    public final class PListIterator implements Iterator<PListEntry> {
         final Iterator<Map.Entry<String, Location>> iterator;
         final Transaction tx;
 
         PListIterator() throws IOException {
             tx = store.pageFile.tx();
-            this.iterator = iterator(tx);
+            synchronized (indexLock) {
+                this.iterator = iterator(tx);
+            }
         }
 
         @Override
@@ -234,6 +235,16 @@ public class PList extends ListIndex<Str
                 throw e;
             }
         }
+
+        public void release() {
+            try {
+                tx.rollback();
+            } catch (IOException unexpected) {
+                IllegalStateException e = new IllegalStateException(unexpected);
+                e.initCause(unexpected);
+                throw e;
+            }
+        }
     }
 
     public void claimFileLocations(final Set<Integer> candidates) throws IOException {
@@ -254,6 +265,6 @@ public class PList extends ListIndex<Str
 
     @Override
     public String toString() {
-        return "" + name + ",[headPageId=" + headPageId  + ",tailPageId=" + tailPageId + ", size=" + size() + "]";
+        return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Wed Aug  3 10:18:18 2011
@@ -76,6 +76,7 @@ public class PListStore extends ServiceS
     private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
     private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
     private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+    private boolean indexEnablePageCaching = true;
 
     public Object getIndexLock() {
         return indexLock;
@@ -110,6 +111,14 @@ public class PListStore extends ServiceS
         this.indexWriteBatchSize = indexWriteBatchSize;
     }
 
+    public boolean getIndexEnablePageCaching() {
+        return indexEnablePageCaching;
+    }
+
+    public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
+        this.indexEnablePageCaching = indexEnablePageCaching;
+    }
+
     protected class MetaData {
         protected MetaData(PListStore store) {
             this.store = store;
@@ -223,10 +232,10 @@ public class PListStore extends ServiceS
                     result = pl;
                     this.persistentLists.put(name, pl);
                 }
-                final PList load = result;
+                final PList toLoad = result;
                 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        load.load(tx);
+                        toLoad.load(tx);
                     }
                 });
 
@@ -269,6 +278,7 @@ public class PListStore extends ServiceS
                 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
                 this.journal.start();
                 this.pageFile = new PageFile(directory, "tmpDB");
+                this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
                 this.pageFile.setPageSize(getIndexPageSize());
                 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
                 this.pageFile.setPageCacheSize(getIndexCacheSize());
@@ -340,12 +350,21 @@ public class PListStore extends ServiceS
 
     public void run() {
         try {
+            final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
             final Set<Integer> candidates = journal.getFileMap().keySet();
             LOG.trace("Full gc candidate set:" + candidates);
             if (candidates.size() > 1) {
+                // prune current write
+                for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
+                    if (iterator.next() >= lastJournalFileId) {
+                        iterator.remove();
+                    }
+                }
                 List<PList> plists = null;
-                synchronized (this) {
-                    plists = new ArrayList(persistentLists.values());
+                synchronized (indexLock) {
+                    synchronized (this) {
+                        plists = new ArrayList(persistentLists.values());
+                    }
                 }
                 for (PList list : plists) {
                     list.claimFileLocations(candidates);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Wed Aug  3 10:18:18 2011
@@ -18,11 +18,11 @@ package org.apache.activemq.store.kahadb
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -44,8 +44,9 @@ public class PListTest {
     private PListStore store;
     private PList plist;
     final ByteSequence payload = new ByteSequence(new byte[400]);
-    final String idSeed = new String("Seed");
+    final String idSeed = new String("Seed" + new byte[1024]);
     final Vector<Throwable> exceptions = new Vector<Throwable>();
+    ExecutorService executor;
    
 
     @Test
@@ -146,7 +147,18 @@ public class PListTest {
         assertFalse(plist.remove("doesNotExist"));
     }
     
-    
+
+    @Test
+    public void testRemoveSingleEntry() throws Exception {
+        plist.addLast("First", new ByteSequence("A".getBytes()));
+
+        Iterator<PListEntry> iterator = plist.iterator();
+        while (iterator.hasNext()) {
+            PListEntry v = iterator.next();
+            iterator.remove();
+        }
+    }
+
     @Test
     public void testRemoveSecondPosition() throws Exception {
         plist.addLast("First", new ByteSequence("A".getBytes()));
@@ -154,7 +166,7 @@ public class PListTest {
         
         assertTrue(plist.remove(1));
         assertTrue(plist.remove(0));
-        assertFalse(plist.remove(3));
+        assertFalse(plist.remove(0));
     }
 
 
@@ -165,36 +177,47 @@ public class PListTest {
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
         store = new PListStore();
+        store.setCleanupInterval(400);
         store.setDirectory(directory);
         store.setJournalMaxFileLength(1024*5);
         store.start();
 
-        final ByteSequence payload = new ByteSequence(new byte[1024*4]);
+        final ByteSequence payload = new ByteSequence(new byte[1024*2]);
 
 
         final Vector<Throwable> exceptions = new Vector<Throwable>();
-        final int iterations = 1000;
+        final int iterations = 5000;
         final int numLists = 10;
 
         final PList[] lists = new PList[numLists];
+        String threadName = Thread.currentThread().getName();
         for (int i=0; i<numLists; i++) {
-            lists[i] = store.getPList("List" + i);
+            Thread.currentThread().setName("C:"+String.valueOf(i));
+            lists[i] = store.getPList(String.valueOf(i));
         }
+        Thread.currentThread().setName(threadName);
 
-        ExecutorService executor = Executors.newFixedThreadPool(100);
+        executor = Executors.newFixedThreadPool(100);
         class A implements Runnable {
             @Override
             public void run() {
+                final String threadName = Thread.currentThread().getName();
                 try {
                     for (int i=0; i<iterations; i++) {
                         PList candidate = lists[i%numLists];
-                        candidate.addLast(String.valueOf(i), payload);
-                        PListEntry entry = candidate.getFirst();
-                        assertTrue(candidate.remove(String.valueOf(i)));
+                        Thread.currentThread().setName("ALRF:"+candidate.getName());
+                        synchronized (plistLocks(candidate)) {
+                            candidate.addLast(String.valueOf(i), payload);
+                            PListEntry entry = candidate.getFirst();
+                            assertTrue(candidate.remove(String.valueOf(i)));
+                        }
                     }
                 } catch (Exception error) {
+                    LOG.error("Unexpcted ex", error);
                     error.printStackTrace();
                     exceptions.add(error);
+                }  finally {
+                    Thread.currentThread().setName(threadName);
                 }
             }
         };
@@ -202,16 +225,22 @@ public class PListTest {
         class B implements  Runnable {
             @Override
             public void run() {
+                final String threadName = Thread.currentThread().getName();
                 try {
                     for (int i=0; i<iterations; i++) {
                         PList candidate = lists[i%numLists];
-                        candidate.addLast(String.valueOf(i), payload);
-                        PListEntry entry = candidate.getFirst();
-                        assertTrue(candidate.remove(String.valueOf(i)));
+                        Thread.currentThread().setName("ALRF:"+candidate.getName());
+                         synchronized (plistLocks(candidate)) {
+                            candidate.addLast(String.valueOf(i), payload);
+                            PListEntry entry = candidate.getFirst();
+                            assertTrue(candidate.remove(String.valueOf(i)));
+                         }
                     }
                 } catch (Exception error) {
                     error.printStackTrace();
                     exceptions.add(error);
+                }  finally {
+                    Thread.currentThread().setName(threadName);
                 }
             }
         };
@@ -244,7 +273,7 @@ public class PListTest {
 
         final int numThreads = 20;
         final int iterations = 2000;
-        ExecutorService executor = Executors.newFixedThreadPool(100);
+        executor = Executors.newFixedThreadPool(100);
         for (int i=0; i<numThreads; i++) {
             new Job(i, PListTest.TaskType.ADD, iterations).run();
         }
@@ -333,7 +362,7 @@ public class PListTest {
         }
 
         LOG.info("parallel add and remove");
-        ExecutorService executor = Executors.newFixedThreadPool(numLists*2);
+        executor = Executors.newFixedThreadPool(numLists*2);
         for (int i=0; i<numLists*2; i++) {
             executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
         }
@@ -344,7 +373,72 @@ public class PListTest {
         assertTrue("no exceptions", exceptions.isEmpty());
     }
 
-    enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE}
+    // for non determinant issues, increasing this may help diagnose
+    final int numRepeats = 1;
+
+    @Test
+    public void testRepeatStressWithCache() throws Exception {
+        for (int i=0; i<numRepeats;i++) {
+            do_testConcurrentAddIterateRemove(true);
+        }
+    }
+
+    @Test
+    public void testRepeatStressWithOutCache() throws Exception {
+        for (int i=0; i<numRepeats;i++) {
+            do_testConcurrentAddIterateRemove(false);
+        }
+    }
+
+    public void do_testConcurrentAddIterateRemove(boolean enablePageCache) throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setIndexEnablePageCaching(enablePageCache);
+        store.setIndexPageSize(2*1024);
+        store.setDirectory(directory);
+        store.start();
+
+        final int iterations = 5000;
+        final int numLists = 50;
+
+        LOG.info("create");
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+        }
+
+        LOG.info("fill");
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.ADD, iterations).run();
+        }
+
+        LOG.info("parallel add and remove");
+        executor = Executors.newFixedThreadPool(400);
+        final int numProducer = 5;
+        final int numConsumer = 50;
+        for (int i=0; i<numLists; i++) {
+            for (int j=0; j<numProducer; j++) {
+                executor.execute(new Job(i, PListTest.TaskType.ADD, iterations*2));
+            }
+            for (int k=0;k<numConsumer; k++) {
+                executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations/4));
+            }
+        }
+
+         for (int i=numLists; i<numLists*10; i++) {
+            executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
+         }
+
+        executor.shutdown();
+        LOG.info("wait for parallel work to complete");
+        boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
+        assertTrue("test did not  timeout ", shutdown);
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
 
     class Job implements Runnable {
 
@@ -360,52 +454,102 @@ public class PListTest {
 
         @Override
         public void run() {
+            final String threadName = Thread.currentThread().getName();
             try {
                 PList plist = null;
                 switch (task) {
                     case CREATE:
-                        plist = store.getPList("List-" + id);
+                        Thread.currentThread().setName("C:"+id);
+                        plist = store.getPList(String.valueOf(id));
+                        LOG.info("Job-" + id + ", CREATE");
                         break;
                     case DELETE:
-                        store.removePList("List-" + id);
+                        Thread.currentThread().setName("D:"+id);
+                        store.removePList(String.valueOf(id));
                         break;
                     case ADD:
-                        plist = store.getPList("List-" + id);
+                        Thread.currentThread().setName("A:"+id);
+                        plist = store.getPList(String.valueOf(id));
 
                         for (int j = 0; j < iterations; j++) {
-                            plist.addLast(idSeed + "id" + j, payload);
-                            if (j > 0 && j % (iterations / 2) == 0) {
-                                LOG.info("Job-" + id + ", Done: " + j);
+                            synchronized (plistLocks(plist)) {
+                                plist.addLast ("PL>"  + id + idSeed + "-" + j, payload);
                             }
                         }
+                        LOG.info("Job-" + id + ", Add, done: " + iterations);
                         break;
                     case REMOVE:
-                        plist = store.getPList("List-" + id);
-
-                        for (int j = iterations -1; j >= 0; j--) {
-                            plist.remove(idSeed + "id" + j);
-                            if (j > 0 && j % (iterations / 2) == 0) {
-                                LOG.info("Job-" + id + " Done remove: " + j);
+                        Thread.currentThread().setName("R:"+id);
+                        plist = store.getPList(String.valueOf(id));
+                        synchronized (plistLocks(plist)) {
+
+                            for (int j = iterations -1; j >= 0; j--) {
+                                plist.remove("PL>"  + id + idSeed + "-" + j);
+                                if (j > 0 && j % (iterations / 2) == 0) {
+                                    LOG.info("Job-" + id + " Done remove: " + j);
+                                }
                             }
                         }
                         break;
                     case ITERATE:
-                        plist = store.getPList("List-" + id);
+                        Thread.currentThread().setName("I:"+id);
+                        plist = store.getPList(String.valueOf(id));
 
-                        Iterator<PListEntry> iterator = plist.iterator();
-                        PListEntry element = null;
-                        while (iterator.hasNext()) {
-                            element = iterator.next();
+                        synchronized (plistLocks(plist)) {
+                            Iterator<PListEntry> iterator = plist.iterator();
+                            PListEntry element = null;
+                            while (iterator.hasNext()) {
+                                element = iterator.next();
+                            }
                         }
                         break;
+
+                    case ITERATE_REMOVE:
+                        Thread.currentThread().setName("IRM:"+id);
+                        plist = store.getPList(String.valueOf(id));
+
+                        int removeCount = 0;
+                        synchronized (plistLocks(plist)) {
+
+                            Iterator<PListEntry> removeIterator = plist.iterator();
+                            PListEntry v = null;
+
+                            while (removeIterator.hasNext()) {
+                                v = removeIterator.next();
+                                removeIterator.remove();
+                                if (removeCount++ > iterations) {
+                                    break;
+                                }
+                            }
+                        }
+                        LOG.info("Job-" + id + " Done remove: " + removeCount);
+                        break;
+
                     default:
                 }
 
             } catch (Exception e) {
                 e.printStackTrace();
                 exceptions.add(e);
+                executor.shutdownNow();
+            } finally {
+                Thread.currentThread().setName(threadName);
+            }
+        }
+    }
+
+    Map<PList, Object> locks = new HashMap<PList, Object>();
+    private Object plistLocks(PList plist) {
+        Object lock = null;
+        synchronized (locks) {
+            if (locks.containsKey(plist)) {
+                lock = locks.get(plist);
+             } else {
+                lock = new Object();
+                locks.put(plist, lock);
             }
         }
+        return lock;
     }
 
     @Before
@@ -421,7 +565,7 @@ public class PListTest {
         store = new PListStore();
         store.setDirectory(directory);
         store.start();
-        plist = store.getPList("test");
+        plist = store.getPList("main");
     }
 
     @After

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java Wed Aug  3 10:18:18 2011
@@ -32,7 +32,7 @@ import org.apache.kahadb.util.Marshaller
 public class ListIndex<Key,Value> implements Index<Key,Value> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class);
-
+    public  final static long NOT_SET = -1;
     protected PageFile pageFile;
     protected long headPageId;
     protected long tailPageId;
@@ -40,7 +40,7 @@ public class ListIndex<Key,Value> implem
 
     protected AtomicBoolean loaded = new AtomicBoolean();
 
-    private final ListNode.Marshaller<Key, Value> marshaller = new ListNode.Marshaller<Key, Value>(this);
+    private ListNode.NodeMarshaller<Key, Value> marshaller;
     private Marshaller<Key> keyMarshaller;
     private Marshaller<Value> valueMarshaller;
 
@@ -49,7 +49,7 @@ public class ListIndex<Key,Value> implem
 
     public ListIndex(PageFile pageFile, long headPageId) {
         this.pageFile = pageFile;
-        this.headPageId = headPageId;
+        setHeadPageId(headPageId);
     }
 
     synchronized public void load(Transaction tx) throws IOException {
@@ -61,20 +61,23 @@ public class ListIndex<Key,Value> implem
             if( valueMarshaller == null ) {
                 throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex");
             }
-            
-            final Page<ListNode<Key,Value>> p = tx.load(headPageId, null);
+
+            marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller);
+            final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null);
             if( p.getType() == Page.PAGE_FREE_TYPE ) {
                  // Need to initialize it..
                 ListNode<Key, Value> root = createNode(p);
                 storeNode(tx, root, true);
-                tailPageId = headPageId = p.getPageId();
+                setHeadPageId(p.getPageId());
+                setTailPageId(getHeadPageId());
             } else {
-                ListNode<Key, Value> node = loadNode(tx, headPageId);
+                ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
+                setTailPageId(getHeadPageId());
                 size.addAndGet(node.size(tx));
-                while (node.getNext() != -1) {
+                while (node.getNext() != NOT_SET ) {
                     node = loadNode(tx, node.getNext());
                     size.addAndGet(node.size(tx));
-                    tailPageId = node.getPageId();
+                    setTailPageId(node.getPageId());
                 }
             }
         }
@@ -86,11 +89,11 @@ public class ListIndex<Key,Value> implem
     }
     
     protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
-        return loadNode(tx, headPageId);
+        return loadNode(tx, getHeadPageId());
     }
 
     protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
-        return loadNode(tx, tailPageId);
+        return loadNode(tx, getTailPageId());
     }
 
     synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
@@ -201,25 +204,23 @@ public class ListIndex<Key,Value> implem
         Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
         ListNode<Key, Value> node = page.get();
         node.setPage(page);
+        node.setContainingList(this);
         return node;
     }
 
     ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
-        ListNode<Key,Value> node = new ListNode<Key,Value>(this);
+        ListNode<Key,Value> node = new ListNode<Key,Value>();
         node.setPage(page);
         page.set(node);
+        node.setContainingList(this);
         return node;
     }
 
-    ListNode<Key,Value> createNode(Transaction tx) throws IOException {
-        Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), null);
-        ListNode<Key,Value> node = new ListNode<Key,Value>(this);
-        node.setPage(page);
-        page.set(node);
-        return node;
+    public ListNode<Key,Value> createNode(Transaction tx) throws IOException {
+        return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null));
     }
 
-    void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
+    public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
         tx.store(node.getPage(), marshaller, overflow);
     }
         
@@ -257,6 +258,10 @@ public class ListIndex<Key,Value> implem
         this.tailPageId = tailPageId;
     }
 
+    public long getTailPageId() {
+       return tailPageId;
+    }
+
     public long size() {
         return size.get();
     }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java Wed Aug  3 10:18:18 2011
@@ -26,6 +26,7 @@ import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
+import org.apache.kahadb.util.Marshaller;
 import org.apache.kahadb.util.VariableMarshaller;
 
 /**
@@ -35,22 +36,24 @@ import org.apache.kahadb.util.VariableMa
 public final class ListNode<Key,Value> {
     private final static boolean ADD_FIRST = true;
     private final static boolean ADD_LAST = false;
-    private final static long NOT_SET = -1;
 
     // The index that this node is part of.
-    private final ListIndex<Key,Value> index;
+    private ListIndex<Key,Value> containingList;
 
     // The page associated with this node
     private Page<ListNode<Key,Value>> page;
 
-    protected LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>();
+    private LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>() {
+
+        @Override
+        public String toString() {
+            return "PageId:" + page.getPageId() + ", index:" + containingList + super.toString();
+        }
+    };
 
     // The next page after this one.
-    private long next = NOT_SET;
+    private long next = ListIndex.NOT_SET;
 
-    public int size(Transaction tx) {
-        return entries.size();
-    }
 
     static final class KeyValueEntry<Key, Value> extends LinkedNode<KeyValueEntry<Key, Value>> implements Entry<Key, Value>
     {
@@ -83,11 +86,13 @@ public final class ListNode<Key,Value> {
     private final class ListNodeIterator implements Iterator<ListNode<Key,Value>> {
 
         private final Transaction tx;
+        private final ListIndex<Key,Value> index;
         ListNode<Key,Value> nextEntry;
 
-        private ListNodeIterator(Transaction tx, ListNode<Key,Value> current) throws IOException {
+        private ListNodeIterator(Transaction tx, ListNode<Key,Value> current) {
             this.tx = tx;
             nextEntry = current;
+            index = current.getContainingList();
         }
 
         public boolean hasNext() {
@@ -96,8 +101,8 @@ public final class ListNode<Key,Value> {
 
         public ListNode<Key,Value> next() {
             ListNode<Key,Value> current = nextEntry;
-            if( nextEntry !=null ) {
-                if (nextEntry.next != NOT_SET) {
+            if( current !=null ) {
+                if (current.next != ListIndex.NOT_SET) {
                     try {
                         nextEntry = index.loadNode(tx, current.next);
                     } catch (IOException unexpected) {
@@ -120,64 +125,96 @@ public final class ListNode<Key,Value> {
     private final class ListIterator implements Iterator<Entry<Key, Value>> {
 
         private final Transaction tx;
-        ListNode<Key,Value> current, prev;
+        private final ListIndex<Key,Value> targetList;
+        ListNode<Key,Value> currentNode, previousNode;
         KeyValueEntry<Key, Value> nextEntry;
-        KeyValueEntry<Key, Value>  toRemove;
+        KeyValueEntry<Key, Value> entryToRemove;
 
-        private ListIterator(Transaction tx, ListNode<Key,Value> current, long nextIndex) throws IOException {
+        private ListIterator(Transaction tx, ListNode<Key,Value> current, long start) {
             this.tx = tx;
-            this.current = current;
+            this.currentNode = current;
+            this.targetList = current.getContainingList();
             nextEntry = current.entries.getHead();
-            if (nextIndex > 0 && nextEntry != null) {
-                for (long i=0; i<nextIndex; i++) {
-                    nextEntry = nextEntry.getNext();
-                    if (nextEntry == null) {
-                        if (!nextFromNextListNode())
-                            throw new NoSuchElementException("Index out of range: " + nextIndex);
-                        }
-                    }
-                }
+            if (start > 0) {
+                moveToRequestedStart(start);
+            }
+        }
+
+        private void moveToRequestedStart(final long start) {
+            long count = 0;
+            while (hasNext() && count < start) {
+                next();
+                count++;
+            }
+            if (!hasNext()) {
+                throw new NoSuchElementException("Index " + start + " out of current range: " + count);
             }
+        }
 
-        private boolean nextFromNextListNode() {
-            boolean haveNext = false;
-            if (current.getNext() != NOT_SET) {
+        private KeyValueEntry<Key, Value> getFromNextNode() {
+            KeyValueEntry<Key, Value> result = null;
+            if (currentNode.getNext() != ListIndex.NOT_SET) {
                 try {
-                    prev = current;
-                    current = index.loadNode(tx, current.getNext());
+                    previousNode = currentNode;
+                    currentNode = targetList.loadNode(tx, currentNode.getNext());
                 } catch (IOException unexpected) {
                     NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
                     e.initCause(unexpected);
                     throw e;
                 }
-                nextEntry = current.entries.getHead();
-                haveNext = nextEntry != null;
+                result = currentNode.entries.getHead();
             }
-            return haveNext;
+            return result;
         }
 
         public boolean hasNext() {
-            return nextEntry !=null || nextFromNextListNode();
+            if (nextEntry == null) {
+                nextEntry = getFromNextNode();
+            }
+            return nextEntry != null;
         }
 
         public Entry<Key, Value> next() {
             if( nextEntry !=null ) {
-                toRemove = nextEntry;
-                nextEntry=toRemove.getNext();
-                return toRemove;
+                entryToRemove = nextEntry;
+                nextEntry = entryToRemove.getNext();
+                return entryToRemove;
             } else {
                 throw new NoSuchElementException();
             }
         }
 
         public void remove() {
-            if (toRemove == null) {
-                throw new IllegalStateException("can only remove once, call next again");
+            if (entryToRemove == null) {
+                throw new IllegalStateException("can only remove once, call hasNext();next() again");
             }
             try {
-                doRemove(tx, current, prev, toRemove);
-                index.onRemove();
-                toRemove = null;
+                entryToRemove.unlink();
+                entryToRemove = null;
+                ListNode<Key,Value> toRemoveNode = null;
+                if (currentNode.entries.isEmpty()) {
+                    // may need to free this node
+                    if (currentNode.isHead() && currentNode.isTail()) {
+                        // store empty list
+                    } else if (currentNode.isHead()) {
+                        // new head
+                        toRemoveNode = currentNode;
+                        nextEntry = getFromNextNode();
+                        targetList.setHeadPageId(currentNode.getPageId());
+                    } else if (currentNode.isTail()) {
+                        toRemoveNode = currentNode;
+                        previousNode.setNext(ListIndex.NOT_SET);
+                        previousNode.store(tx);
+                        targetList.setTailPageId(previousNode.getPageId());
+                    }
+                }
+                targetList.onRemove();
+
+                if (toRemoveNode != null) {
+                    tx.free(toRemoveNode.getPage());
+                } else {
+                    currentNode.store(tx);
+                }
             } catch (IOException unexpected) {
                 IllegalStateException e = new IllegalStateException(unexpected.getLocalizedMessage());
                 e.initCause(unexpected);
@@ -192,11 +229,13 @@ public final class ListNode<Key,Value> {
      * @param <Key>
      * @param <Value>
      */
-    static public class Marshaller<Key,Value> extends VariableMarshaller<ListNode<Key,Value>> {
-        private final ListIndex<Key,Value> index;
-
-        public Marshaller(ListIndex<Key,Value> index) {
-            this.index = index;
+    static public final class NodeMarshaller<Key,Value> extends VariableMarshaller<ListNode<Key,Value>> {
+        private final Marshaller<Key> keyMarshaller;
+        private final Marshaller<Value> valueMarshaller;
+
+        public NodeMarshaller(Marshaller<Key> keyMarshaller, Marshaller<Value> valueMarshaller) {
+            this.keyMarshaller = keyMarshaller;
+            this.valueMarshaller = valueMarshaller;
         }
 
         public void writePayload(ListNode<Key,Value> node, DataOutput os) throws IOException {
@@ -209,58 +248,31 @@ public final class ListNode<Key,Value> {
             os.writeShort(count);
             KeyValueEntry<Key, Value> entry = node.entries.getHead();
             while (entry != null) {
-                index.getKeyMarshaller().writePayload((Key) entry.getKey(), os);
-                index.getValueMarshaller().writePayload((Value) entry.getValue(), os);
+                keyMarshaller.writePayload((Key) entry.getKey(), os);
+                valueMarshaller.writePayload((Value) entry.getValue(), os);
                 entry = entry.getNext();
             }
         }
 
         @SuppressWarnings("unchecked")
         public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
-            ListNode<Key,Value> node = new ListNode<Key,Value>(index);
+            ListNode<Key,Value> node = new ListNode<Key,Value>();
             node.next = is.readLong();
             final short size = is.readShort();
             for (short i = 0; i < size; i++) {
                 node.entries.addLast(
-                        new KeyValueEntry(index.getKeyMarshaller().readPayload(is),
-                                                     index.getValueMarshaller().readPayload(is)));
+                        new KeyValueEntry(keyMarshaller.readPayload(is),
+                                                     valueMarshaller.readPayload(is)));
             }
             return node;
         }
     }
 
-    public ListNode(ListIndex<Key, Value> index) {
-        this.index = index;
-    }
-
-    private void doRemove(final Transaction tx, final ListNode current, final ListNode prev, KeyValueEntry<Key, Value> entry) throws IOException {
-        entry.unlink();
-        if (current.entries.isEmpty()) {
-                if (current.getPageId() == index.getHeadPageId()) {
-                    if (current.getNext() != NOT_SET) {
-                        // new head
-                        index.setHeadPageId(current.getNext());
-                        tx.free(current.getPageId());
-                    } else {
-                        //  store current in empty state
-                        store(tx);
-                    }
-                } else {
-                    // need to unlink the node
-                    prev.setNext(current.next);
-                    index.storeNode(tx, prev, false);
-                    tx.free(current.getPageId());
-                }
-        } else {
-            store(tx);
-        }
-    }
-
     public Value put(Transaction tx, Key key, Value value) throws IOException {
         if (key == null) {
             throw new IllegalArgumentException("Key cannot be null");
         }
-        entries.addLast(new KeyValueEntry(key, value));
+        entries.addLast(new KeyValueEntry<Key, Value>(key, value));
         store(tx, ADD_LAST);
         return null;
     }
@@ -269,14 +281,14 @@ public final class ListNode<Key,Value> {
         if (key == null) {
             throw new IllegalArgumentException("Key cannot be null");
         }
-        entries.addFirst(new KeyValueEntry(key, value));
+        entries.addFirst(new KeyValueEntry<Key, Value>(key, value));
         store(tx, ADD_FIRST);
         return null;
     }
 
     private void store(Transaction tx, boolean addFirst) throws IOException {
         try {
-            index.storeNode(tx, this, false);
+            getContainingList().storeNode(tx, this, false);
         } catch ( Transaction.PageOverflowIOException e ) {
                 // If we get an overflow
                 split(tx, addFirst);
@@ -284,23 +296,23 @@ public final class ListNode<Key,Value> {
     }
 
     private void store(Transaction tx) throws IOException {
-        index.storeNode(tx, this, false);
+        getContainingList().storeNode(tx, this, false);
     }
 
     private void split(Transaction tx, boolean isAddFirst) throws IOException {
-        ListNode<Key, Value> extension = index.createNode(tx);
+        ListNode<Key, Value> extension = getContainingList().createNode(tx);
         if (isAddFirst) {
             // head keeps the first entry, insert extension with the rest
             extension.setNext(this.getNext());
             this.setNext(extension.getPageId());
             extension.setEntries(entries.getHead().splitAfter());
         }  else {
-            index.setTailPageId(extension.getPageId());
             this.setNext(extension.getPageId());
             extension.setEntries(entries.getTail().getPrevious().splitAfter());
+            getContainingList().setTailPageId(extension.getPageId());
         }
-        index.storeNode(tx, this, false);
         extension.store(tx, isAddFirst);
+        store(tx);
     }
 
     // called after a split
@@ -308,7 +320,7 @@ public final class ListNode<Key,Value> {
         this.entries = list;
     }
 
-    public Value get(Transaction tx, Key key) throws IOException {
+    public Value get(Transaction tx, Key key) {
         if (key == null) {
             throw new IllegalArgumentException("Key cannot be null");
         }
@@ -324,15 +336,15 @@ public final class ListNode<Key,Value> {
         return result;
     }
 
-    public boolean isEmpty(final Transaction tx) throws IOException {
+    public boolean isEmpty(final Transaction tx)  {
         return entries.isEmpty();
     }
 
-    public Entry<Key,Value> getFirst(Transaction tx) throws IOException {
+    public Entry<Key,Value> getFirst(Transaction tx) {
         return entries.getHead();
     }
 
-    public Entry<Key,Value> getLast(Transaction tx) throws IOException {
+    public Entry<Key,Value> getLast(Transaction tx) {
         return entries.getTail();
     }
 
@@ -353,7 +365,7 @@ public final class ListNode<Key,Value> {
         tx.free(this.getPageId());
     }
 
-    public boolean contains(Transaction tx, Key key) throws IOException {
+    public boolean contains(Transaction tx, Key key) {
         if (key == null) {
             throw new IllegalArgumentException("Key cannot be null");
         }
@@ -392,10 +404,30 @@ public final class ListNode<Key,Value> {
     public void setNext(long next) {
         this.next = next;
     }
-    
+
+    public void setContainingList(ListIndex<Key, Value> list) {
+        this.containingList = list;
+    }
+
+    public ListIndex<Key,Value> getContainingList() {
+        return containingList;
+    }
+
+    public boolean isHead() {
+        return getPageId() == containingList.getHeadPageId();
+    }
+
+    public boolean isTail() {
+        return getPageId() == containingList.getTailPageId();
+    }
+
+    public int size(Transaction tx) {
+        return entries.size();
+    }
+
     @Override
     public String toString() {
-        return "[ListNode(" + page.getPageId() + "->" + next + ") " + entries.toString() + "]";
+        return "[ListNode(" + (page != null ?  page.getPageId() + "->" + next : "null") + ")[" + entries.size() + "]]";
     }
 }
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Wed Aug  3 10:18:18 2011
@@ -171,7 +171,7 @@ public class PageFile {
 
         @Override
         public String toString() {
-            return "[PageWrite:"+page.getPageId()+"]";
+            return "[PageWrite:"+page.getPageId()+ "-" + page.getType()  + "]";
         }
 
         @SuppressWarnings("unchecked")
@@ -827,9 +827,7 @@ public class PageFile {
 
     public void freePage(long pageId) {
         freeList.add(pageId);
-        if( enablePageCaching ) {
-            pageCache.remove(pageId);
-        }
+        removeFromCache(pageId);
     }
     
     @SuppressWarnings("unchecked")
@@ -932,9 +930,9 @@ public class PageFile {
         }
     }
 
-    void removeFromCache(Page page) {
+    void removeFromCache(long pageId) {
         if (enablePageCaching) {
-            pageCache.remove(page.getPageId());
+            pageCache.remove(pageId);
         }
     }
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Wed Aug  3 10:18:18 2011
@@ -129,8 +129,6 @@ public class Transaction implements Iter
      *         if the PageFile is not loaded
      */
     public <T> Page<T> allocate(int count) throws IOException {
-        // TODO: we need to track allocated pages so that they can be returned if the
-        // transaction gets rolled back.
         Page<T> rc = pageFile.allocate(count);
         allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
         return rc;

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LinkedNode.java Wed Aug  3 10:18:18 2011
@@ -197,7 +197,7 @@ public class LinkedNode<T extends Linked
 
     public void linkToHead(LinkedNodeList<T> target) {
         if (list != null) {
-            throw new IllegalArgumentException("This node is already linked to a node");
+            throw new IllegalArgumentException("This node is already linked to a list");
         }
 
         if (target.head == null) {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=1153420&r1=1153419&r2=1153420&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Wed Aug  3 10:18:18 2011
@@ -204,7 +204,7 @@ public class SequenceSet extends LinkedN
                 return sequence;
             }
             if (sequence.range() > count ) {
-                Sequence rc = new Sequence(sequence.first, sequence.first+count);
+                Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
                 sequence.first+=count;
                 return rc;
             }



Mime
View raw message