activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1170849 - in /activemq/trunk/kahadb/src: main/java/org/apache/kahadb/index/ListIndex.java main/java/org/apache/kahadb/index/ListNode.java test/java/org/apache/kahadb/index/ListIndexTest.java
Date Wed, 14 Sep 2011 21:15:40 GMT
Author: tabish
Date: Wed Sep 14 21:15:39 2011
New Revision: 1170849

URL: http://svn.apache.org/viewvc?rev=1170849&view=rev
Log:
Some updates and changes to support some work on https://issues.apache.org/jira/browse/AMQ-3467

Enhance the ListIndex to improve performance of the remove and put operations, put is now
a
real put and will update the element with the given key if it exists in the list, otherwise
it will add it to the end.  Also adds the ability for a single key/value pair to span more
than one page when needed, multiple elements will still reside on one page whenever possible.

Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1170849&r1=1170848&r2=1170849&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java Wed Sep 14
21:15:39 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.kahadb.index.ListNode.ListIterator;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
@@ -103,6 +104,11 @@ public class ListIndex<Key,Value> implem
 
     synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
         assertLoaded();
+
+        if (size.get() == 0) {
+            return false;
+        }
+
         for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext();
) {
             Map.Entry<Key,Value> candidate = iterator.next();
             if (key.equals(candidate.getKey())) {
@@ -112,11 +118,17 @@ public class ListIndex<Key,Value> implem
         return false;
     }
 
+    private ListNode<Key, Value> lastGetNodeCache = null;
+    private Map.Entry<Key, Value> lastGetEntryCache = null;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     synchronized public Value get(Transaction tx, Key key) throws IOException {
         assertLoaded();
         for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext();
) {
             Map.Entry<Key,Value> candidate = iterator.next();
             if (key.equals(candidate.getKey())) {
+                this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
+                this.lastGetEntryCache = candidate;
                 return candidate.getValue();
             }
         }
@@ -124,10 +136,52 @@ public class ListIndex<Key,Value> implem
     }
 
     /**
-      * appends to the list
-     * @return null
+     * Update the value of the item with the given key in the list if ot exists, otherwise
+     * it appends the value to the end of the list.
+     *
+     * @return the old value contained in the list if one exists or null.
      */
+    @SuppressWarnings({ "rawtypes" })
     synchronized public Value put(Transaction tx, Key key, Value value) throws IOException
{
+
+        Value oldValue = null;
+
+        if (lastGetNodeCache != null) {
+
+            if(lastGetEntryCache.getKey().equals(key)) {
+                oldValue = lastGetEntryCache.setValue(value);
+                lastGetEntryCache.setValue(value);
+                lastGetNodeCache.storeUpdate(tx);
+                return oldValue;
+            }
+
+            // This searches from the last location of a call to get for the element to replace
+            // all the way to the end of the ListIndex.
+            Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
+            while (iterator.hasNext()) {
+                Map.Entry<Key, Value> entry = iterator.next();
+                if (entry.getKey().equals(key)) {
+                    oldValue = entry.setValue(value);
+                    ((ListIterator) iterator).getCurrent().storeUpdate(tx);
+                    return oldValue;
+                }
+            }
+        }
+
+        // Not found because the cache wasn't set or its not at the end of the list so we
+        // start from the beginning and go to the cached location or the end, then we do
+        // an add if its not found.
+        Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
+        while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache)
{
+            Map.Entry<Key, Value> entry = iterator.next();
+            if (entry.getKey().equals(key)) {
+                oldValue = entry.setValue(value);
+                ((ListIterator) iterator).getCurrent().storeUpdate(tx);
+                return oldValue;
+            }
+        }
+
+        // Not found so add it last.
         return add(tx, key, value);
     }
 
@@ -145,15 +199,40 @@ public class ListIndex<Key,Value> implem
         return null;
     }
 
+    @SuppressWarnings("rawtypes")
     synchronized public Value remove(Transaction tx, Key key) throws IOException {
         assertLoaded();
-        for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext();
) {
-            Map.Entry<Key,Value> candidate = iterator.next();
-            if (key.equals(candidate.getKey())) {
+
+        if (size.get() == 0) {
+            return null;
+        }
+
+        if (lastGetNodeCache != null) {
+
+            // This searches from the last location of a call to get for the element to remove
+            // all the way to the end of the ListIndex.
+            Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
+            while (iterator.hasNext()) {
+                Map.Entry<Key, Value> entry = iterator.next();
+                if (entry.getKey().equals(key)) {
+                    iterator.remove();
+                    return entry.getValue();
+                }
+            }
+        }
+
+        // Not found because the cache wasn't set or its not at the end of the list so we
+        // start from the beginning and go to the cached location or the end to find the
+        // element to remove.
+        Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
+        while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache)
{
+            Map.Entry<Key, Value> entry = iterator.next();
+            if (entry.getKey().equals(key)) {
                 iterator.remove();
-                return candidate.getValue();
+                return entry.getValue();
             }
         }
+
         return null;
     }
 
@@ -227,6 +306,7 @@ public class ListIndex<Key,Value> implem
 
     public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow)
throws IOException {
         tx.store(node.getPage(), marshaller, overflow);
+        flushCache();
     }
 
     public PageFile getPageFile() {
@@ -270,4 +350,9 @@ public class ListIndex<Key,Value> implem
     public long size() {
         return size.get();
     }
+
+    private void flushCache() {
+        this.lastGetEntryCache = null;
+        this.lastGetNodeCache = null;
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1170849&r1=1170848&r2=1170849&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java Wed Sep 14 21:15:39
2011
@@ -57,7 +57,7 @@ public final class ListNode<Key,Value> {
     static final class KeyValueEntry<Key, Value> extends LinkedNode<KeyValueEntry<Key,
Value>> implements Entry<Key, Value>
     {
         private final Key key;
-        private final Value value;
+        private Value value;
 
         public KeyValueEntry(Key key, Value value) {
             this.key = key;
@@ -73,7 +73,9 @@ public final class ListNode<Key,Value> {
         }
 
         public Value setValue(Value value) {
-            throw new UnsupportedOperationException();
+            Value oldValue = this.value;
+            this.value = value;
+            return oldValue;
         }
 
         @Override
@@ -121,7 +123,7 @@ public final class ListNode<Key,Value> {
         }
     }
 
-    private final class ListIterator implements Iterator<Entry<Key, Value>> {
+    final class ListIterator implements Iterator<Entry<Key, Value>> {
 
         private final Transaction tx;
         private final ListIndex<Key,Value> targetList;
@@ -220,6 +222,10 @@ public final class ListNode<Key,Value> {
                 throw e;
             }
         }
+
+        ListNode<Key, Value> getCurrent() {
+            return this.currentNode;
+        }
     }
 
     /**
@@ -285,9 +291,28 @@ public final class ListNode<Key,Value> {
         return null;
     }
 
+    public void storeUpdate(Transaction tx) throws IOException {
+        try {
+            if (this.entries.size() == 1) {
+                getContainingList().storeNode(tx, this, true);
+            } else {
+                getContainingList().storeNode(tx, this, false);
+            }
+        } catch ( Transaction.PageOverflowIOException e ) {
+            split(tx, ADD_FIRST);
+        }
+    }
+
     private void store(Transaction tx, boolean addFirst) throws IOException {
         try {
-            getContainingList().storeNode(tx, this, false);
+            // When we split to a node of one element we can span multiple
+            // pages for that entry, otherwise we keep the entries on one
+            // page to avoid fragmented reads and segment the list traversal.
+            if (this.entries.size() == 1) {
+                getContainingList().storeNode(tx, this, true);
+            } else {
+                getContainingList().storeNode(tx, this, false);
+            }
         } catch ( Transaction.PageOverflowIOException e ) {
             // If we get an overflow
             split(tx, addFirst);
@@ -295,7 +320,11 @@ public final class ListNode<Key,Value> {
     }
 
     private void store(Transaction tx) throws IOException {
-        getContainingList().storeNode(tx, this, false);
+        if (this.entries.size() == 1) {
+            getContainingList().storeNode(tx, this, true);
+        } else {
+            getContainingList().storeNode(tx, this, false);
+        }
     }
 
     private void split(Transaction tx, boolean isAddFirst) throws IOException {
@@ -311,7 +340,7 @@ public final class ListNode<Key,Value> {
             getContainingList().setTailPageId(extension.getPageId());
         }
         extension.store(tx, isAddFirst);
-        store(tx);
+        store(tx, true);
     }
 
     // called after a split

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java?rev=1170849&r1=1170848&r2=1170849&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java Wed Sep
14 21:15:39 2011
@@ -16,12 +16,24 @@
  */
 package org.apache.kahadb.index;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.text.NumberFormat;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
+
+import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,6 +96,38 @@ public class ListIndexTest extends Index
         tx.commit();
     }
 
+    public void testPut() throws Exception {
+        createPageFileAndIndex(100);
+
+        ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        int count = 30;
+        tx = pf.tx();
+        doInsert(count);
+        tx.commit();
+        assertEquals("correct size", count, listIndex.size());
+
+        tx = pf.tx();
+        Long value = listIndex.get(tx, key(10));
+        assertNotNull(value);
+        listIndex.put(tx, key(10), Long.valueOf(1024));
+        tx.commit();
+
+        tx = pf.tx();
+        value = listIndex.get(tx, key(10));
+        assertEquals(1024L, value.longValue());
+        assertTrue(listIndex.size() == 30);
+        tx.commit();
+
+        tx = pf.tx();
+        value = listIndex.put(tx, key(31), Long.valueOf(2048));
+        assertNull(value);
+        assertTrue(listIndex.size() == 31);
+        tx.commit();
+    }
+
     public void testAddFirst() throws Exception {
         createPageFileAndIndex(100);
 
@@ -273,7 +317,7 @@ public class ListIndexTest extends Index
         final int COUNT = 50000;
         long start = System.currentTimeMillis();
         for (int i = 0; i < COUNT; i++) {
-             listIndex.put(tx, key(i), (long) i);
+             listIndex.add(tx, key(i), (long) i);
              tx.commit();
         }
         LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + "
mills");
@@ -295,9 +339,85 @@ public class ListIndexTest extends Index
         LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
     }
 
+    private int getMessageSize(int min, int max) {
+        return min + (int)(Math.random() * ((max - min) + 1));
+    }
+
+    public void testLargeValueOverflow() throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<Long, String> test = new ListIndex<Long, String>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final long NUM_ADDITIONS = 32L;
+
+        LinkedList<Long> expected = new LinkedList<Long>();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; ++i) {
+            final int stringSize = getMessageSize(1, 4096);
+            String val = new String(new byte[stringSize]);
+            expected.add(Long.valueOf(stringSize));
+            test.add(tx, i, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; i++) {
+            String s = test.get(tx, i);
+            assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+        }
+        tx.commit();
+
+        expected.clear();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; ++i) {
+            final int stringSize = getMessageSize(1, 4096);
+            String val = new String(new byte[stringSize]);
+            expected.add(Long.valueOf(stringSize));
+            test.addFirst(tx, i+NUM_ADDITIONS, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; i++) {
+            String s = test.get(tx, i+NUM_ADDITIONS);
+            assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+        }
+        tx.commit();
+
+        expected.clear();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; ++i) {
+            final int stringSize = getMessageSize(1, 4096);
+            String val = new String(new byte[stringSize]);
+            expected.add(Long.valueOf(stringSize));
+            test.put(tx, i, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; i++) {
+            String s = test.get(tx, i);
+            assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+        }
+        tx.commit();
+    }
+
     void doInsertReverse(int count) throws Exception {
         for (int i = count - 1; i >= 0; i--) {
-            ((ListIndex) index).addFirst(tx, key(i), (long) i);
+            ((ListIndex<String, Long>) index).addFirst(tx, key(i), (long) i);
             tx.commit();
         }
     }
@@ -306,4 +426,35 @@ public class ListIndexTest extends Index
     protected String key(int i) {
         return "key:" + nf.format(i);
     }
+
+    static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>>
{
+        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
+
+        public void writePayload(HashSet<String> object, DataOutput dataOut) throws
IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oout = new ObjectOutputStream(baos);
+            oout.writeObject(object);
+            oout.flush();
+            oout.close();
+            byte[] data = baos.toByteArray();
+            dataOut.writeInt(data.length);
+            dataOut.write(data);
+        }
+
+        @SuppressWarnings("unchecked")
+        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
+            int dataLen = dataIn.readInt();
+            byte[] data = new byte[dataLen];
+            dataIn.readFully(data);
+            ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            ObjectInputStream oin = new ObjectInputStream(bais);
+            try {
+                return (HashSet<String>) oin.readObject();
+            } catch (ClassNotFoundException cfe) {
+                IOException ioe = new IOException("Failed to read HashSet<String>:
" + cfe);
+                ioe.initCause(cfe);
+                throw ioe;
+            }
+        }
+    }
 }



Mime
View raw message