Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1130607&r1=1130606&r2=1130607&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java Thu Jun 2 15:28:30 2011 @@ -33,18 +33,20 @@ import org.apache.kahadb.util.VariableMa * one overflowing Page of a PageFile. */ public final class ListNode { + private final static boolean ADD_FIRST = true; + private final static boolean ADD_LAST = false; + private final static long NOT_SET = -1; // The index that this node is part of. private final ListIndex index; - // The parent node or null if this is the root node of the List - private ListNode parent; + // The page associated with this node private Page> page; protected LinkedNodeList> entries = new LinkedNodeList>(); // The next page after this one. - private long next = -1; + private long next = NOT_SET; public int size(Transaction tx) { return entries.size(); @@ -95,9 +97,9 @@ public final class ListNode { public ListNode next() { ListNode current = nextEntry; if( nextEntry !=null ) { - if (nextEntry.next != -1) { + if (nextEntry.next != NOT_SET) { try { - nextEntry = index.loadNode(tx, current.next, current); + nextEntry = index.loadNode(tx, current.next); } catch (IOException unexpected) { IllegalStateException e = new IllegalStateException("failed to load next: " + current.next + ", reason: " + unexpected.getLocalizedMessage()); e.initCause(unexpected); @@ -118,16 +120,16 @@ public final class ListNode { private final class ListIterator implements Iterator> { private final Transaction tx; - ListNode current; + ListNode current, prev; KeyValueEntry nextEntry; KeyValueEntry toRemove; - private ListIterator(Transaction tx, ListNode current, int nextIndex) throws IOException { + private ListIterator(Transaction tx, ListNode current, long nextIndex) throws IOException { this.tx = tx; this.current = current; nextEntry = current.entries.getHead(); - if (nextIndex > 0) { - for (int i=0; i 0 && nextEntry != null) { + for (long i=0; i { private boolean nextFromNextListNode() { boolean haveNext = false; - if (current.getNext() != -1) { + if (current.getNext() != NOT_SET) { try { - current = index.loadNode(tx, current.getNext(), current); + prev = current; + current = index.loadNode(tx, current.getNext()); } catch (IOException unexpected) { NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage()); e.initCause(unexpected); @@ -172,7 +175,7 @@ public final class ListNode { throw new IllegalStateException("can only remove once, call next again"); } try { - doRemove(tx, current, toRemove); + doRemove(tx, current, prev, toRemove); index.onRemove(); toRemove = null; } catch (IOException unexpected) { @@ -197,7 +200,7 @@ public final class ListNode { } public void writePayload(ListNode node, DataOutput os) throws IOException { - // Write the keys + os.writeLong(node.next); short count = (short)node.entries.size(); // cast may truncate value... if( count != node.entries.size() ) { throw new IOException("short over flow, too many entries in list: " + node.entries.size()); @@ -215,6 +218,7 @@ public final class ListNode { @SuppressWarnings("unchecked") public ListNode readPayload(DataInput is) throws IOException { ListNode node = new ListNode(index); + node.next = is.readLong(); final short size = is.readShort(); for (short i = 0; i < size; i++) { node.entries.addLast( @@ -229,40 +233,26 @@ public final class ListNode { this.index = index; } - public void setEmpty() { - } - - public Value remove(Transaction tx, Key key) throws IOException { - Value result = null; - KeyValueEntry entry = entries.getHead(); - while (entry != null) { - if (entry.getKey().equals(key)) { - result = entry.getValue(); - doRemove(tx, this, entry); - break; - } - entry = entry.getNext(); - } - return result; - } - - private void doRemove(Transaction tx, ListNode current, KeyValueEntry entry) throws IOException { + private void doRemove(final Transaction tx, final ListNode current, final ListNode prev, KeyValueEntry entry) throws IOException { entry.unlink(); if (current.entries.isEmpty()) { if (current.getPageId() == index.getHeadPageId()) { - if (current.getNext() != -1) { + if (current.getNext() != NOT_SET) { // new head index.setHeadPageId(current.getNext()); tx.free(current.getPageId()); + } else { + // store current in empty state + store(tx); } } else { // need to unlink the node - current.parent.setNext(current.getNext()); + prev.setNext(current.next); + index.storeNode(tx, prev, false); tx.free(current.getPageId()); - index.storeNode(tx, current.parent, false); } } else { - store(tx, true); + store(tx); } } @@ -271,7 +261,7 @@ public final class ListNode { throw new IllegalArgumentException("Key cannot be null"); } entries.addLast(new KeyValueEntry(key, value)); - store(tx, false); + store(tx, ADD_LAST); return null; } @@ -280,29 +270,30 @@ public final class ListNode { throw new IllegalArgumentException("Key cannot be null"); } entries.addFirst(new KeyValueEntry(key, value)); - store(tx, true); + store(tx, ADD_FIRST); return null; } private void store(Transaction tx, boolean addFirst) throws IOException { try { - index.storeNode(tx, this, allowOverflow()); + index.storeNode(tx, this, false); } catch ( Transaction.PageOverflowIOException e ) { // If we get an overflow split(tx, addFirst); } } - private boolean allowOverflow() { - return false; + private void store(Transaction tx) throws IOException { + index.storeNode(tx, this, false); } private void split(Transaction tx, boolean isAddFirst) throws IOException { - ListNode extension = index.createNode(tx, this); + ListNode extension = index.createNode(tx); if (isAddFirst) { - extension.setEntries(entries.getHead().splitAfter()); + // head keeps the first entry, insert extension with the rest extension.setNext(this.getNext()); this.setNext(extension.getPageId()); + extension.setEntries(entries.getHead().splitAfter()); } else { index.setTailPageId(extension.getPageId()); this.setNext(extension.getPageId()); @@ -345,7 +336,7 @@ public final class ListNode { return entries.getTail(); } - public Iterator> iterator(final Transaction tx, int pos) throws IOException { + public Iterator> iterator(final Transaction tx, long pos) throws IOException { return new ListIterator(tx, this, pos); } @@ -386,14 +377,6 @@ public final class ListNode { return page.getPageId(); } - public ListNode getParent() { - return parent; - } - - public void setParent(ListNode parent) { - this.parent = parent; - } - public Page> getPage() { return page; } @@ -412,7 +395,7 @@ public final class ListNode { @Override public String toString() { - return "[ListNode "+ entries.toString() + "]"; + return "[ListNode(" + page.getPageId() + "->" + next + ") " + entries.toString() + "]"; } } Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1130607&r1=1130606&r2=1130607&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Jun 2 15:28:30 2011 @@ -64,6 +64,7 @@ public class PageFile { // 4k Default page size. public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000)); + public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", ""+100));; private static final int RECOVERY_FILE_HEADER_SIZE=1024*4; private static final int PAGE_FILE_HEADER_SIZE=1024*4; @@ -103,8 +104,8 @@ public class PageFile { // The cache of recently used pages. private boolean enablePageCaching=true; // How many pages will we keep in the cache? - private int pageCacheSize = 100; - + private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE; + // Should first log the page write to the recovery buffer? Avoids partial // page write failures.. private boolean enableRecoveryFile=true; @@ -129,7 +130,7 @@ public class PageFile { // Persistent settings stored in the page file. private MetaData metaData; - + /** * Use to keep track of updated pages which have not yet been committed. */ @@ -682,7 +683,7 @@ public class PageFile { public long getFreePageCount() { assertLoaded(); - return freeList.size(); + return freeList.rangeSize(); } public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1130607&r1=1130606&r2=1130607&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Thu Jun 2 15:28:30 2011 @@ -38,7 +38,7 @@ import org.apache.kahadb.util.SequenceSe * do multiple update operations in a single unit of work. */ public class Transaction implements Iterable { - + /** * The PageOverflowIOException occurs when a page write is requested * and it's data is larger than what would fit into a single page. @@ -142,7 +142,7 @@ public class Transaction implements Iter /** * Frees up a previously allocated page so that it can be re-allocated again. * - * @param page the page to free up + * @param pageId the page to free up * @throws IOException * If an disk error occurred. * @throws IllegalStateException @@ -155,7 +155,7 @@ public class Transaction implements Iter /** * Frees up a previously allocated sequence of pages so that it can be re-allocated again. * - * @param page the initial page of the sequence that will be getting freed + * @param pageId the initial page of the sequence that will be getting freed * @param count the number of pages in the sequence * * @throws IOException @@ -216,6 +216,8 @@ public class Transaction implements Iter } page.makeFree(getWriteTransactionId()); + // ensure free page is visible while write is pending + pageFile.addToCache(page.copy()); DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize()); page.write(out); @@ -451,7 +453,7 @@ public class Transaction implements Iter } if (page.getType() == Page.PAGE_FREE_TYPE) { - throw new EOFException("Chunk stream does not exist at page: " + page.getPageId()); + throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free"); } return page; @@ -560,7 +562,6 @@ public class Transaction implements Iter * iterated. * * @param includeFreePages - if true, free pages are included in the iteration - * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction. * @throws IllegalStateException * if the PageFile is not loaded */ Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=1130607&r1=1130606&r2=1130607&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Jun 2 15:28:30 2011 @@ -280,4 +280,14 @@ public class SequenceSet extends LinkedN return false; } + public long rangeSize() { + long result = 0; + Sequence sequence = getHead(); + while (sequence != null) { + result += sequence.range(); + sequence = sequence.getNext(); + } + return result; + } + } \ No newline at end of file Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff ============================================================================== --- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java (original) +++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java Thu Jun 2 15:28:30 2011 @@ -228,39 +228,9 @@ public class ListIndexTest extends Index tx.commit(); } - public void testVisitor() throws Exception { - createPageFileAndIndex(100); - ListIndex index = ((ListIndex) this.index); - this.index.load(tx); - tx.commit(); - - // Insert in reverse order.. - doInsert(1000); - - this.index.unload(tx); - tx.commit(); - this.index.load(tx); - tx.commit(); - - // BTree should iterate it in sorted order. - - /*index.visit(tx, new BTreeVisitor(){ - public boolean isInterestedInKeysBetween(String first, String second) { - return true; - } - public void visit(List keys, List values) { - } - });*/ - - - this.index.unload(tx); - tx.commit(); - } - - public void testRandomRemove() throws Exception { - createPageFileAndIndex(100); + createPageFileAndIndex(4*1024); ListIndex index = ((ListIndex) this.index); this.index.load(tx); tx.commit(); @@ -295,21 +265,34 @@ public class ListIndexTest extends Index index.remove(tx, key(1566)); } - public void testLargeAppendTimed() throws Exception { - createPageFileAndIndex(100); + public void testLargeAppendRemoveTimed() throws Exception { + createPageFileAndIndex(1024*4); ListIndex listIndex = ((ListIndex) this.index); this.index.load(tx); tx.commit(); final int COUNT = 50000; long start = System.currentTimeMillis(); for (int i = 0; i < COUNT; i++) { - //String test = new String("test" + i); - //ByteSequence bs = new ByteSequence(test.getBytes()); listIndex.put(tx, key(i), (long) i); tx.commit(); } LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills"); LOG.info("Page count: " + listIndex.getPageFile().getPageCount()); + + start = System.currentTimeMillis(); + tx = pf.tx(); + int removeCount = 0; + Iterator> iterator = index.iterator(tx); + while (iterator.hasNext()) { + iterator.next(); + iterator.remove(); + removeCount++; + } + tx.commit(); + assertEquals("Removed all", COUNT, removeCount); + LOG.info("Time to remove " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills"); + LOG.info("Page count: " + listIndex.getPageFile().getPageCount()); + LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount()); } void doInsertReverse(int count) throws Exception {