activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r684838 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/impl/index/robusthash/ test/java/org/apache/kahadb/impl/index/robusthash/
Date Mon, 11 Aug 2008 17:46:39 GMT
Author: chirino
Date: Mon Aug 11 10:46:37 2008
New Revision: 684838

URL: http://svn.apache.org/viewvc?rev=684838&view=rev
Log:
Adding initial pass at a robust hash index implementation.. The HashTest is still failing..

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashEntry.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndexMBean.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/package.html   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashTest.java

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import java.io.IOException;
+
+/**
+ * Bin in a HashIndex
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashBin {
+    private HashIndex hashIndex;
+    private int id;
+    private int maximumEntries;
+    private int size;
+    private int numberOfPages =0;
+    private HashPageInfo root = null;
+    private HashPageInfo tail = null;
+
+    /**
+     * Constructor
+     * 
+     * @param hashIndex
+     * @param id
+     * @param maximumEntries
+     */
+    HashBin(HashIndex hashIndex, int id, int maximumEntries) {
+        this.hashIndex = hashIndex;
+        this.id = id;
+        this.maximumEntries = maximumEntries;
+    }
+
+    public String toString() {
+        return "HashBin[" + getId() + "]";
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof HashBin) {
+            HashBin other = (HashBin)o;
+            result = other.id == id;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return (int)getId();
+    }
+
+     int  getId() {
+        return id;
+    }
+
+     void setId(int id) {
+        this.id = id;
+    }
+
+     boolean isEmpty() {
+        return true;
+    }
+
+     int getMaximumEntries() {
+        return this.maximumEntries;
+    }
+
+     void setMaximumEntries(int maximumEntries) {
+        this.maximumEntries = maximumEntries;
+    }
+
+     int size() {
+        return size;
+    }
+
+     HashPageInfo addHashPageInfo(long id, int size) throws IOException {
+        HashPageInfo info = new HashPageInfo(hashIndex);
+        info.setId(id);
+        info.setSize(size);
+        if (root == null) {
+            root=info;
+        }else {
+            tail.linkAfter(info);
+        }
+        tail=info;
+        this.numberOfPages++;
+        this.size += size;
+        return info;
+    }
+
+     public HashEntry find(HashEntry key) throws IOException {
+        HashEntry result = null;
+        try {
+            int low = 0;
+            int high = size()-1;
+            while (low <= high) {
+                int mid = (low + high) >> 1;
+                HashEntry te = getHashEntry(mid);
+                int cmp = te.compareTo(key);
+                if (cmp == 0) {
+                    result = te;
+                    break;
+                } else if (cmp < 0) {
+                    low = mid + 1;
+                } else {
+                    high = mid - 1;
+                }
+            }
+        } finally {
+            end();
+        }
+        return result;
+    }
+
+     boolean put(HashEntry newEntry) throws IOException {
+        boolean replace = false;
+        try {
+            int low = 0;
+            int high = size()-1;
+            while (low <= high) {
+                int mid = (low + high) >> 1;
+                HashEntry midVal = getHashEntry(mid);
+                int cmp = midVal.compareTo(newEntry);
+                if (cmp < 0) {
+                    low = mid + 1;
+                } else if (cmp > 0) {
+                    high = mid - 1;
+                } else {
+                    replace = true;
+                    midVal.setIndexOffset(newEntry.getIndexOffset());
+                    break;
+                }
+            }
+            if (!replace) {
+                addHashEntry(low, newEntry);
+                size++;
+            }
+        } finally {
+            end();
+        }
+        return replace;
+    }
+
+     HashEntry remove(HashEntry entry) throws IOException {
+        HashEntry result = null;
+        try {
+            int low = 0;
+            int high = size() - 1;
+            while (low <= high) {
+                int mid = (low + high) >> 1;
+                HashEntry te = getHashEntry(mid);
+                int cmp = te.compareTo(entry);
+                if (cmp == 0) {
+                    result = te;
+                    removeHashEntry(mid);
+                    size--;
+                    break;
+                } else if (cmp < 0) {
+                    low = mid + 1;
+                } else {
+                    high = mid - 1;
+                }
+            }
+        } finally {
+            end();
+        }
+        return result;
+    }
+
+    private void addHashEntry(int index, HashEntry entry) throws IOException {
+        HashPageInfo pageToUse = null;
+        int offset = 0;
+        if (index >= getMaximumBinSize()) {
+            while(index >= getMaximumBinSize()) {
+                HashPage hp = hashIndex.createPage(id);
+                pageToUse = addHashPageInfo(hp.getOffset(), 0);
+                pageToUse.setPage(hp);
+            }
+            offset = 0;
+        } else {
+            int count = 0;
+            int countSoFar=0;
+            int pageNo = 0;
+            HashPageInfo page = root;
+            while (page != null) {
+                count += page.size();
+                pageToUse=page;
+                if (index < count ) {
+                    offset = index - countSoFar;
+                    break;
+                }
+                if (index == count && page.size()+1 <= maximumEntries) {
+                    offset = page.size();
+                    break;
+                }
+                countSoFar += page.size();
+                pageNo++;
+                page = (HashPageInfo) page.getNext();
+            }
+            while(pageNo >= this.numberOfPages) {
+                HashPage hp  = hashIndex.createPage(id);
+                pageToUse = addHashPageInfo(hp.getOffset(), 0);               
+            }            
+        }
+        pageToUse.begin();  
+        pageToUse.addHashEntry(offset, entry);
+        doOverFlow(index);
+    }
+
+    private HashEntry removeHashEntry(int index) throws IOException {
+        HashPageInfo page = getRetrievePage(index);
+        int offset = getRetrieveOffset(index);
+        HashEntry result = page.removeHashEntry(offset);
+        
+        if (page.isEmpty()) {
+            if (root.equals(page)) {
+                root=(HashPageInfo) root.getNext();
+            }
+            if (tail.equals(page)) {
+                tail=(HashPageInfo) page.getPrevious();
+            }
+            page.unlink();
+            this.numberOfPages--;
+            hashIndex.releasePage(page.getPage());
+        }
+        doUnderFlow(index);
+        return result;
+    }
+
+    private HashEntry getHashEntry(int index) throws IOException {
+        HashPageInfo page = getRetrievePage(index);
+        page.begin();
+        int offset = getRetrieveOffset(index);
+        HashEntry result = page.getHashEntry(offset);
+        return result;
+    }
+    
+
+    private int getMaximumBinSize() {
+        return maximumEntries * this.numberOfPages;
+    }
+
+    private HashPageInfo getRetrievePage(int index) throws IOException {
+        HashPageInfo result = null;
+        int count = 0;
+        HashPageInfo page = root;
+        while (page != null) {
+            count += page.size();
+            result = page;
+            if (index < count) {
+                break;
+            }
+            page = (HashPageInfo) page.getNext();
+        }
+        
+        result.begin();
+        return result;
+    }
+
+    private int getRetrieveOffset(int index) throws IOException {
+        int result = 0;
+        int count = 0;
+        HashPageInfo page = root;
+        while (page != null) {
+            if ((index + 1) <= (count + page.size())) {
+                result = index - count;
+                break;
+            }
+            count += page.size();
+            page = (HashPageInfo) page.getNext();
+        }
+        return result;
+    }
+
+    private void doOverFlow(int index) throws IOException {
+        HashPageInfo info = getRetrievePage(index);
+        if (info.size() > maximumEntries) {
+            // overflowed
+            info.begin();
+            HashEntry entry = info.removeHashEntry(info.size() - 1);
+            doOverFlow(getNextPage(info), entry);
+        }
+    }
+
+    private void doOverFlow(HashPageInfo next, HashEntry entry) throws IOException {
+        HashPageInfo info = null;
+        if (next == null) {
+            HashPage page = hashIndex.createPage(id);
+            info = addHashPageInfo(page.getOffset(), 0);
+            info.setPage(page);
+        } else {
+            info = next;
+        }
+        info.begin();
+        info.addHashEntry(0, entry);
+        if (info.size() > maximumEntries) {
+            // overflowed
+            HashEntry overflowed = info.removeHashEntry(info.size() - 1);
+            doOverFlow(getNextPage(info), overflowed);
+        }
+    }
+    
+    private HashPageInfo getNextPage(HashPageInfo start) {
+        return (HashPageInfo) start.getNext();
+    }
+
+    private void doUnderFlow(int index) {
+    }
+
+    String dump() throws IOException {
+        String str = "[" + this.numberOfPages+"]";
+        HashPageInfo page = root;
+        while (page != null) {
+            page.begin();
+            str +=page.dump();
+            page.end();
+            page = (HashPageInfo) page.getNext();
+        }
+        return str;
+    }
+    private void end() throws IOException {
+        HashPageInfo page = root;
+        while (page != null) {
+            page.end();
+            page = (HashPageInfo) page.getNext();
+        }
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashEntry.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashEntry.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashEntry.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.kahadb.Marshaller;
+
+/**
+ * Key and index for DiskBased Hash Index
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashEntry implements Comparable {
+
+    static final int NOT_SET = -1;
+    private Comparable key;
+    private long indexOffset;
+
+    public int compareTo(Object o) {
+        if (o instanceof HashEntry) {
+            HashEntry other = (HashEntry)o;
+            return key.compareTo(other.key);
+        } else {
+            return key.compareTo(o);
+        }
+    }
+
+    public boolean equals(Object o) {
+        return compareTo(o) == 0;
+    }
+
+    public int hashCode() {
+        return key.hashCode();
+    }
+
+    public String toString() {
+        return "HashEntry(" + key + "," + indexOffset + ")";
+    }
+
+    HashEntry copy() {
+        HashEntry copy = new HashEntry();
+        copy.key = this.key;
+        copy.indexOffset = this.indexOffset;
+        return copy;
+    }
+
+    /**
+     * @return the key
+     */
+    Comparable getKey() {
+        return this.key;
+    }
+
+    /**
+     * @param key the key to set
+     */
+    void setKey(Comparable key) {
+        this.key = key;
+    }
+
+    /**
+     * @return the indexOffset
+     */
+    long getIndexOffset() {
+        return this.indexOffset;
+    }
+
+    /**
+     * @param indexOffset the indexOffset to set
+     */
+    void setIndexOffset(long indexOffset) {
+        this.indexOffset = indexOffset;
+    }
+
+    void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+        dataOut.writeLong(indexOffset);
+        keyMarshaller.writePayload(key, dataOut);
+    }
+
+    void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+        indexOffset = dataIn.readLong();
+        key = (Comparable)keyMarshaller.readPayload(dataIn);
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,967 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import com.sun.jdi.InvalidStackFrameException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LRUCache;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class HashIndex implements Index, HashIndexMBean {
+    
+    
+    public static final int DEFAULT_PAGE_SIZE;
+    public static final int DEFAULT_KEY_SIZE;
+    public static final int DEFAULT_BIN_SIZE;
+    public static final int MAXIMUM_CAPACITY;
+    public static final int DEFAULT_LOAD_FACTOR;
+    private static final int LOW_WATER_MARK=1024*16;
+    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=100;
+    // Recovery header is (long offset) + (int data_size) 
+    private static final int RECOVERY_HEADER_SIZE=12;  
+    
+    private static final String NAME_PREFIX = "hash-index-";
+    private static final Log LOG = LogFactory.getLog(HashIndex.class);
+    private final String name;
+    private File directory;
+    private File file;
+    private RandomAccessFile readIndexFile;
+    private RandomAccessFile writeIndexFile;
+    private IndexManager indexManager;
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private int keySize = DEFAULT_KEY_SIZE;
+    private int numberOfBins = DEFAULT_BIN_SIZE;
+    private int keysPerPage = this.pageSize /this.keySize;
+    private int recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER;
+    private DataByteArrayInputStream dataIn;
+    private byte[] readBuffer;
+    private HashBin[] bins;
+    private Marshaller keyMarshaller;
+    private long length;
+    private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
+    private AtomicBoolean loaded = new AtomicBoolean();
+    private LRUCache<Long, HashPage> pageCache;
+    
+    private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
+    private int pageCacheSize = 10;
+    private int size;
+    private int highestSize=0;
+    private int activeBins;
+    private int threshold;
+    private int maximumCapacity=MAXIMUM_CAPACITY;
+    private int loadFactor=DEFAULT_LOAD_FACTOR;
+    
+    private AtomicLong nextTxid = new AtomicLong();
+    
+    
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @param numberOfBins
+     * @throws IOException
+     */
+    public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
+        this.directory = directory;
+        this.name = name;
+        this.indexManager = indexManager;
+        openIndexFile();
+        pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public synchronized void setKeyMarshaller(Marshaller marshaller) {
+        this.keyMarshaller = marshaller;
+    }
+
+    /**
+     * @return the keySize
+     */
+    public synchronized int getKeySize() {
+        return this.keySize;
+    }
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public synchronized void setKeySize(int keySize) {
+        this.keySize = keySize;
+        if (loaded.get()) {
+            throw new RuntimeException("Pages already loaded - can't reset key size");
+        }
+    }
+
+    /**
+     * @return the pageSize
+     */
+    public synchronized int getPageSize() {
+        return this.pageSize;
+    }
+
+    /**
+     * @param pageSize the pageSize to set
+     */
+    public synchronized void setPageSize(int pageSize) {
+        if (loaded.get() && pageSize != this.pageSize) {
+            throw new RuntimeException("Pages already loaded - can't reset page size");
+        }
+        this.pageSize = pageSize;
+    }
+    
+    /**
+     * @return number of bins
+     */
+    public int getNumberOfBins() {
+        return this.numberOfBins;
+    }
+
+    /**
+     * @param numberOfBins
+     */
+    public void setNumberOfBins(int numberOfBins) {
+        if (loaded.get() && numberOfBins != this.numberOfBins) {
+            throw new RuntimeException("Pages already loaded - can't reset bin size");
+        }
+        this.numberOfBins = numberOfBins;
+    }
+
+    /**
+     * @return the enablePageCaching
+     */
+    public synchronized boolean isEnablePageCaching() {
+        return this.enablePageCaching;
+    }
+
+    /**
+     * @param enablePageCaching the enablePageCaching to set
+     */
+    public synchronized void setEnablePageCaching(boolean enablePageCaching) {
+        this.enablePageCaching = enablePageCaching;
+    }
+
+    /**
+     * @return the pageCacheSize
+     */
+    public synchronized int getPageCacheSize() {
+        return this.pageCacheSize;
+    }
+
+    /**
+     * @param pageCacheSize the pageCacheSize to set
+     */
+    public synchronized void setPageCacheSize(int pageCacheSize) {
+        this.pageCacheSize = pageCacheSize;
+        pageCache.setMaxCacheSize(pageCacheSize);
+    }
+
+    public synchronized boolean isTransient() {
+        return false;
+    }
+    
+    /**
+     * @return the threshold
+     */
+    public int getThreshold() {
+        return threshold;
+    }
+
+    /**
+     * @param threshold the threshold to set
+     */
+    public void setThreshold(int threshold) {
+        this.threshold = threshold;
+    }
+
+    /**
+     * @return the loadFactor
+     */
+    public int getLoadFactor() {
+        return loadFactor;
+    }
+
+    /**
+     * @param loadFactor the loadFactor to set
+     */
+    public void setLoadFactor(int loadFactor) {
+        this.loadFactor = loadFactor;
+    }
+    
+    /**
+     * @return the maximumCapacity
+     */
+    public int getMaximumCapacity() {
+        return maximumCapacity;
+    }
+
+    /**
+     * @param maximumCapacity the maximumCapacity to set
+     */
+    public void setMaximumCapacity(int maximumCapacity) {
+        this.maximumCapacity = maximumCapacity;
+    }
+    
+    public synchronized int getSize() {
+        return size;
+    }
+    
+    public synchronized int getActiveBins(){
+        return activeBins;
+    }
+
+    public synchronized void load() {
+        if (loaded.compareAndSet(false, true)) {
+            int capacity = 1;
+            while (capacity < numberOfBins) {
+                capacity <<= 1;
+            }
+            this.bins = new HashBin[capacity];
+            threshold = calculateThreashold();
+            keysPerPage = pageSize / keySize;
+            recoveryBufferSize = (pageSize+RECOVERY_HEADER_SIZE) * MAX_PAGES_IN_RECOVERY_BUFFER;
+            length=recoveryBufferSize;
+            dataIn = new DataByteArrayInputStream();
+            readBuffer = new byte[pageSize];
+            try {
+                openIndexFile();
+                if (readIndexFile.length() > 0) {
+                    doCompress();
+                }
+            } catch (IOException e) {
+                LOG.error("Failed to load index ", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }    
+
+    public synchronized void unload() throws IOException {
+        if (loaded.compareAndSet(true, false)) {
+            checkpoint();
+            try {
+                stopWriter();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+            
+            LOG.debug("Index has "+freeList.size()+" free pages.");
+            if (readIndexFile != null) {
+                readIndexFile.close();
+                readIndexFile = null;
+                writeIndexFile.close();
+                writeIndexFile=null;
+                freeList.clear();
+                pageCache.clear();
+                synchronized(writes) {
+                    writes.clear();
+                }
+                bins = new HashBin[bins.length];
+            }
+        }
+    }
+
+    public synchronized void store(Object key, StoreEntry value) throws IOException {
+        load();
+        HashEntry entry = new HashEntry();
+        entry.setKey((Comparable)key);
+        entry.setIndexOffset(value.getOffset());
+        if (!getBin(key).put(entry)) {
+            this.size++;
+        }
+        if (this.size >= this.threshold) {
+            resize(2*bins.length);
+        }
+        if(this.size > this.highestSize) {
+            this.highestSize=this.size;
+        }
+    }
+
+    public synchronized StoreEntry get(Object key) throws IOException {
+        load();
+        HashEntry entry = new HashEntry();
+        entry.setKey((Comparable)key);
+        HashEntry result = getBin(key).find(entry);
+        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
+    }
+
+    public synchronized StoreEntry remove(Object key) throws IOException {
+        load();
+        StoreEntry result = null;
+        HashEntry entry = new HashEntry();
+        entry.setKey((Comparable)key);
+        HashEntry he = getBin(key).remove(entry);
+        if (he != null) {
+            this.size--;
+            result = this.indexManager.getIndex(he.getIndexOffset());
+        }
+        if (this.highestSize > LOW_WATER_MARK &&  this.highestSize > (this.size *2)) {
+            int newSize = this.size/this.keysPerPage;
+            newSize = Math.max(128, newSize);
+            this.highestSize=0;
+            resize(newSize);
+            
+        }
+        return result;
+    }
+
+    public synchronized boolean containsKey(Object key) throws IOException {
+        return get(key) != null;
+    }
+
+    public synchronized void clear() throws IOException {
+        unload();
+        delete();
+        openIndexFile();
+        load();
+    }
+
+    public synchronized void delete() throws IOException {
+        unload();
+        if (file.exists()) {
+            file.delete();
+        }
+        length = 0;
+    }
+
+    HashPage lookupPage(long pageId) throws IOException {
+        HashPage result = null;
+        if (pageId >= 0) {
+            result = getFromCache(pageId);
+            if (result == null) {
+                result = getFullPage(pageId);
+                if (result != null) {
+//                    if (result.isActive()) {
+                        addToCache(result);
+//                    } else {
+//                        throw new IllegalStateException("Trying to access an inactive page: " + pageId);
+//                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    HashPage createPage(int binId) throws IOException {
+        HashPage result = getNextFreePage();
+        if (result == null) {  
+            // allocate one
+            result = new HashPage(this);
+            result.setOffset(length);
+            length += pageSize;
+        }
+        result.setBinId(binId);
+        addToCache(result);
+        return result;
+    }
+
+    void releasePage(HashPage page) throws IOException {
+        removeFromCache(page);
+        page.deactivate();
+        writeFullPage(page);
+        freeList.add(page);
+    }
+
+    private HashPage getNextFreePage() throws IOException {
+        HashPage result = null;
+        if(!freeList.isEmpty()) {
+            result = freeList.removeFirst();
+        }
+        return result;
+    }
+
+    HashPage getFullPage(long id) throws IOException {
+        long rl = readIndexFile.length();
+        if( id >= rl ) {
+            // It's a new page.
+            HashPage page = new HashPage(this);
+            page.setOffset(id);
+            return page;
+        }
+
+        LOG.debug("Reading page: "+id);
+        readIndexFile.seek(id);
+        readIndexFile.readFully(readBuffer, 0, pageSize);
+        dataIn.restart(readBuffer);
+        HashPage page = new HashPage(this);
+        page.setOffset(id);
+        page.read(keyMarshaller, dataIn);
+        return page;
+    }
+
+    HashPage getPageHeader(long id) throws IOException {
+        readIndexFile.seek(id);
+        readIndexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
+        dataIn.restart(readBuffer);
+        HashPage page = new HashPage(this);
+        page.setOffset(id);
+        page.readHeader(dataIn);
+        return page;
+    }
+
+    void addToBin(HashPage page) throws IOException {
+        int index = page.getBinId();
+        if (index >= numberOfBins) {
+            HashBin[] newBins = new HashBin[index+1];
+            System.arraycopy(this.bins, 0, newBins, 0, this.bins.length);
+            this.bins=newBins;
+        }
+        HashBin bin = getBin(index);
+        bin.addHashPageInfo(page.getOffset(), page.getPersistedSize());
+    }
+
+    private HashBin getBin(int index) {
+        
+        HashBin result = bins[index];
+        if (result == null) {
+            result = new HashBin(this, index, pageSize / keySize);
+            bins[index] = result;
+            activeBins++;
+        }
+        return result;
+    }
+
+    private void openIndexFile() throws IOException {
+        if (readIndexFile == null) {
+            file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
+            IOHelper.mkdirs(file.getParentFile());
+            writeIndexFile = new RandomAccessFile(file, "rw");
+            readIndexFile = new RandomAccessFile(file, "r");
+            startWriter();
+        }
+    }
+    
+    private HashBin getBin(Object key) {
+        int hash = hash(key);
+        int i = indexFor(hash, bins.length);
+        return getBin(i);
+    }
+    
+    private void doLoad() throws IOException {
+        if (loaded.compareAndSet(false, true)) {
+
+            LOG.debug("Recovering index");
+            long offset = recoveryBufferSize;
+            nextTxid.set( redoRecoveryUpdates()+1 );
+            LOG.debug("Last written transaction id: "+(nextTxid.get()-1));
+            LOG.debug("Inital page offset: "+offset);
+            int pageCounter=0;
+            while ((offset + pageSize) <= readIndexFile.length()) {
+                readIndexFile.seek(offset);
+                readIndexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
+                dataIn.restart(readBuffer);
+                HashPage page = new HashPage(this);
+                page.setOffset(offset);
+                page.readHeader(dataIn);
+                if (!page.isActive()) {
+                    page.reset();
+                    freeList.add(page);
+                } else {
+                    addToBin(page);
+                    size+=page.size();
+                }
+                offset += pageSize;
+                pageCounter++;
+            }
+            length=offset;
+            LOG.debug("Index contains: "+pageCounter+" pages, of those: "+freeList.size()+" are free");
+            LOG.debug("Next new page offset: "+offset);
+            
+        }
+    }
+    
+    private void doCompress() throws IOException {
+        String backFileName = name + "-COMPRESS";
+        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+        backIndex.setKeyMarshaller(keyMarshaller);
+        backIndex.setKeySize(getKeySize());
+        backIndex.setNumberOfBins(getNumberOfBins());
+        backIndex.setPageSize(getPageSize());
+        backIndex.load();
+        File backFile = backIndex.file;
+        long offset = recoveryBufferSize;
+        while ((offset + pageSize) <= readIndexFile.length()) {
+            readIndexFile.seek(offset);
+            HashPage page = getFullPage(offset);
+            if (page.isActive()) {
+                for (HashEntry entry : page.getEntries()) {
+                    backIndex.getBin(entry.getKey()).put(entry);
+                    backIndex.size++;
+                }
+            }
+            page=null;
+            offset += pageSize;
+        }
+        backIndex.unload();
+      
+        unload();
+        IOHelper.deleteFile(file);
+        IOHelper.copyFile(backFile, file);
+        IOHelper.deleteFile(backFile);
+        openIndexFile();
+        doLoad();
+    }
+    
+    private void resize(int newCapacity) throws IOException {
+        if (bins.length < getMaximumCapacity()) {
+            if (newCapacity != numberOfBins) {
+                int capacity = 1;
+                while (capacity < newCapacity) {
+                    capacity <<= 1;
+                }
+                
+                // Make sure everything is read.
+                checkpoint();
+                
+                newCapacity=capacity;
+                if (newCapacity != numberOfBins) {
+                    LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
+                    
+                    String backFileName = name + "-REISZE";
+                    HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+                    backIndex.setKeyMarshaller(keyMarshaller);
+                    backIndex.setKeySize(getKeySize());
+                    backIndex.setNumberOfBins(newCapacity);
+                    backIndex.setPageSize(getPageSize());
+                    backIndex.load();
+                    File backFile = backIndex.file;
+                    long offset = recoveryBufferSize;
+                    while ((offset + pageSize) <= readIndexFile.length()) {
+                        readIndexFile.seek(offset);
+                        HashPage page = getFullPage(offset);
+                        if (page.isActive()) {
+                            for (HashEntry entry : page.getEntries()) {
+                                backIndex.getBin(entry.getKey()).put(entry);
+                                backIndex.size++;
+                            }
+                        }
+                        page=null;
+                        offset += pageSize;
+                    }
+                    backIndex.unload();
+                  
+                    unload();
+                    IOHelper.deleteFile(file);
+                    IOHelper.copyFile(backFile, file);
+                    IOHelper.deleteFile(backFile);
+                    setNumberOfBins(newCapacity);
+                    bins = new HashBin[newCapacity];
+                    threshold = calculateThreashold();
+                    openIndexFile();
+                    doLoad();
+                }
+            }
+        }else {
+            threshold = Integer.MAX_VALUE;
+            return;
+        }
+    }
+    
+    private int calculateThreashold() {
+        return (int)(bins.length * loadFactor);
+    }
+    
+    
+    public String toString() {
+        String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
+        return str;
+    }
+      
+
+    static int hash(Object x) {
+        int h = x.hashCode();
+        h += ~(h << 9);
+        h ^= h >>> 14;
+        h += h << 4;
+        h ^= h >>> 10;
+        return h;
+    }
+
+    static int indexFor(int h, int length) {
+        return h & (length - 1);
+    }
+
+    static {
+        DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
+        DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
+        DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+        MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
+        DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
+    }
+
+    public int getKeysPerPage() {
+        return keysPerPage;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Double write implementation follows...
+    ///////////////////////////////////////////////////////////////////
+    
+    class PageWrite {
+        final HashPage page;
+        ByteSequence current;
+        ByteSequence diskBound;
+
+        public PageWrite(HashPage page, ByteSequence data) {
+            this.page = page;
+            this.current = data;
+        }
+        
+        public void setCurrent(ByteSequence data) {
+            current=data;
+        }
+
+        void begin() {
+            diskBound = current;
+            current = null;
+        }
+        
+        /**
+         * @return true if there is no pending writes to do.
+         */
+        boolean done() {
+            diskBound=null;
+            return current == null;
+        }
+
+    }
+    
+    private LinkedHashMap<Long, PageWrite> writes=new LinkedHashMap<Long, PageWrite>();
+    private Thread writerThread;
+    AtomicBoolean stopWriter = new AtomicBoolean();
+    private CountDownLatch checkpointLatch;
+    
+    void writeFullPage(HashPage page) throws IOException {
+        DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
+        page.setTxId(nextTxid.get());
+        page.write(keyMarshaller, dataOut);
+        if (dataOut.size() > pageSize) {
+            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+        }
+        
+//        ByteSequence data = new ByteSequence(dataOut.getData(), 0, dataOut.size());
+        ByteSequence data = new ByteSequence(dataOut.getData(), 0, pageSize);
+        Long key = page.getOffset();
+
+        LOG.debug("Page write request at: "+page.getOffset()+", keys: ");
+        synchronized( writes ) {
+            // If it's not in the write cache...
+            PageWrite write = writes.get(key);
+            if( write==null ) {
+                write = new PageWrite(page, data);
+                writes.put(key, write);
+            } else {
+                write.setCurrent(data);
+            }
+            writes.notify();
+        }
+    }
+
+    
+    /**
+     * 
+     * @param timeout
+     * @param unit
+     * @return true if a write was done.
+     * @throws InterruptedException 
+     * @throws IOException 
+     */
+    public boolean pollWrites(long timeout, TimeUnit unit) throws IOException {
+        
+        int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
+        ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
+        
+        synchronized( writes ) {            
+            // If there is nothing to write, wait for a notification...
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+                try {
+                    writes.wait(unit.toMillis(timeout));
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }
+            }
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+                return false;
+            }
+            
+              
+            // build a write batch from the current write cache. 
+            for (PageWrite write : writes.values()) {
+                
+                int l = write.current.length+RECOVERY_HEADER_SIZE;
+                
+                // Will it fit in the batch???
+                if( batchLength + l > recoveryBufferSize ) {
+                    break; // nope.. stop adding to the batch.
+                }
+                
+                batch.add(write);
+                batchLength +=l;
+                
+                // Move the current write to the diskBound write, this lets folks update the 
+                // page again without blocking for this write.
+                write.begin();
+            }
+        }
+       long txId = nextTxid.get();
+        
+       LOG.debug("Starting write batch transaction id: "+txId);
+       
+        // Now the batch array has all the writes, write the batch to the recovery buffer.
+        writeIndexFile.seek(0);
+        writeIndexFile.writeLong(txId); // write txid of the batch
+        writeIndexFile.writeInt(batch.size()); // write the recovery record counter.
+        for (PageWrite w : batch) {
+            LOG.debug("Journaling write at offset: "+w.page.getOffset());
+            writeIndexFile.writeLong(w.page.getOffset());
+            writeIndexFile.writeInt(w.diskBound.length);
+            writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
+        }
+        
+        // Sync to make sure recovery buffer writes land on disk..
+        writeIndexFile.getFD().sync(); 
+        
+        // Now update the actual index locations
+        for (PageWrite w : batch) {
+            LOG.debug("Doing write at offset: "+w.page.getOffset());
+            writeIndexFile.seek(w.page.getOffset());
+            writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
+        }
+        
+        // Sync again
+        writeIndexFile.getFD().sync();
+        
+        nextTxid.incrementAndGet();
+        synchronized( writes ) {
+            for (PageWrite w : batch) {
+                // If there are no more pending writes, then remove it from the write cache.
+                if( w.done() ) {
+                    writes.remove(w.page.getOffset());
+                }
+            }
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+            }
+        }
+        return true;
+    }
+
+    private void releaseCheckpointWaiter() {
+        if( checkpointLatch!=null ) {
+            checkpointLatch.countDown();
+            checkpointLatch=null;
+        }
+    }
+    
+    /**
+     * Flushes all write buffers to disk and returns the transaction id of the last write done to disk.  The 
+     * transaction id can be used for recovery purposes since it always incrementing.
+     * 
+     * @return
+     * @throws InterruptedException 
+     */
+    public long checkpoint() throws IOException {
+        
+        if( stopWriter.get() ) {
+            throw new IOException("Index already stopped, you cannot checkpoint.");
+        }
+        
+        // Setup a latch that gets notified when all buffered writes hits the disk.
+        CountDownLatch checkpointLatch;
+        synchronized( writes ) {
+            if( writes.isEmpty()) {                
+                return nextTxid.get()-1;
+            }
+            if( this.checkpointLatch == null ) {
+                this.checkpointLatch = new CountDownLatch(1);
+            }
+            checkpointLatch = this.checkpointLatch;
+        }        
+        try {
+            checkpointLatch.await();        
+            return nextTxid.get()-1;
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+    }
+    
+    
+    /**
+     * @return the last transaction id successfully written to disk.
+     * @throws IOException
+     */
+    private long redoRecoveryUpdates() throws IOException {
+        
+        if( readIndexFile.length() <12 ) {
+            return 0;
+        }
+
+        // How many recovery records do we have in the recovery buffer?
+        readIndexFile.seek(0);
+        long rc = readIndexFile.readLong();
+        int recordCounter = readIndexFile.readInt();
+        
+        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
+        try {
+            for (int i = 0; i < recordCounter; i++) {
+                long offset = readIndexFile.readLong();
+                int dataLength = readIndexFile.readInt();
+                if( dataLength > pageSize ) {
+                    // Invalid recovery record, data was larger than the page size. Probably due to a partial write to the recovery buffer
+                    return rc-1;
+                }
+                byte []data = new byte[dataLength];
+                if( readIndexFile.read(data, 0, dataLength) != dataLength ) {
+                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
+                }
+                batch.put(offset, data);
+            }
+        } catch (IllegalStateException e) {
+            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. as the index should still be consistent.
+            return rc-1;
+        }
+        
+        // We need to detect if the recovery buffer write was fully synced to disk.  If it was we should see some of it's partial writes in the index. 
+        // If we don't see any (even partial writes) to the index, then we consider that the recovery buffer was the one that was partially written to.
+        // FYI: This depends on the header changing on every write.  It occurs because the header contains a txid which changes on every write.
+        boolean redoNeeded = false;
+        byte header[] = new byte[HashPage.PAGE_HEADER_SIZE]; 
+        byte header2[] = new byte[HashPage.PAGE_HEADER_SIZE]; 
+        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
+            try {
+                
+                readIndexFile.seek(e.getKey());
+                readIndexFile.readFully(header);
+                
+                System.arraycopy(e.getValue(), 0, header2, 0, HashPage.PAGE_HEADER_SIZE);
+                if( Arrays.equals(header, header2) ) {
+                    redoNeeded = true;
+                    break;
+                }
+                
+            } catch (IOException ignore) {
+                // not all records may have been written..
+            }
+        }
+
+        // Stop here if we don't need to redo...
+        if( !redoNeeded ) {
+            return rc-1;
+        }
+        
+        
+        // Re-apply all the writes in the recovery buffer.
+        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
+            writeIndexFile.seek(e.getKey());
+            e.getValue();
+            writeIndexFile.write(e.getValue());
+        }
+        
+        // And sync it to disk
+        writeIndexFile.getFD().sync(); 
+        return rc;
+    }
+
+    private HashPage getFromCache(long pageId) {
+        HashPage result = null;
+        if (enablePageCaching) {
+            result = pageCache.get(pageId);
+        }
+        if( result == null ) {
+            synchronized(writes) {
+                PageWrite pageWrite = writes.get(pageId);
+                if( pageWrite != null ) {
+                    result = pageWrite.page;
+                }
+            }
+            if (enablePageCaching) {
+                pageCache.put(pageId, result);
+            }
+        }
+        return result;
+    }
+
+    private void addToCache(HashPage page) {
+        if (enablePageCaching) {
+            pageCache.put(page.getOffset(), page);
+        }
+    }
+
+    private void removeFromCache(HashPage page) {
+        if (enablePageCaching) {
+            pageCache.remove(page.getOffset());
+        }
+    }
+
+    public void startWriter() {
+        stopWriter.set(false);
+        writerThread = new Thread("Index Writer") {
+            @Override
+            public void run() {
+                try {
+                    while( !stopWriter.get() ) {
+                        pollWrites(500, TimeUnit.MILLISECONDS);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } finally {
+                    releaseCheckpointWaiter();
+                }
+            }
+        };
+        writerThread.start();
+    }
+ 
+    public void stopWriter() throws InterruptedException {
+        stopWriter.set(true);
+        writerThread.join();
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndexMBean.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndexMBean.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndexMBean.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndexMBean.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import org.apache.kahadb.IndexMBean;
+
+/**
+ * MBean for HashIndex
+ *
+ */
+public interface HashIndexMBean extends IndexMBean{
+   
+    /**
+     * @return the keySize
+     */
+    public int getKeySize();
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public void setKeySize(int keySize);
+
+    
+    /**
+     * @return the page size
+     */
+    public int getPageSize();
+
+        
+    /**
+     * @return number of bins
+     */
+    public int getNumberOfBins();
+
+
+    /**
+     * @return the enablePageCaching
+     */
+    public boolean isEnablePageCaching();
+
+    
+    /**
+     * @return the pageCacheSize
+     */
+    public int getPageCacheSize();
+
+    /**
+     * @return size
+     */
+    public int getSize();
+    
+    /**
+     * @return the number of active bins
+     */
+    public int getActiveBins();
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kahadb.Marshaller;
+
+/**
+ * A Page within a HashPage
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashPage {
+    static final int PAGE_HEADER_SIZE = 17;
+//    private static final transient Log LOG = LogFactory.getLog(HashPage.class);
+
+    private final HashIndex index;
+    private long offset;
+
+    // The page record persists the following fields
+    private int binId;
+    private int persistedSize;
+    private long txId;
+    private List<HashEntry> hashIndexEntries;
+    
+    /**
+     * Constructor
+     * 
+     * @param maximumEntries
+     */
+    public HashPage(HashIndex index) {
+        this.index = index;
+        this.hashIndexEntries = new ArrayList<HashEntry>(getMaximumEntries());
+    }
+
+    public String toString() {
+        return "HashPage[" + getOffset() + ":" + binId + ":" + offset+"] size = " + persistedSize;
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof HashPage) {
+            HashPage other = (HashPage)o;
+            result = other.offset == offset;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return (int)offset;
+    }
+
+    boolean isActive() {
+        return binId!=HashEntry.NOT_SET && persistedSize>0;
+    }
+
+    void deactivate() {
+        binId=HashEntry.NOT_SET;
+        reset();
+    }
+
+    
+    long getOffset() {
+        return offset;
+    }
+
+    void setOffset(long id) {
+        this.offset = id;
+    }
+
+    int getPersistedSize() {
+        return persistedSize;
+    }
+
+    void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+        dataOut.writeInt(binId);
+        dataOut.writeLong(txId);
+        persistedSize=hashIndexEntries.size();
+        dataOut.writeInt(persistedSize);
+        for (HashEntry entry : hashIndexEntries) {
+            entry.write(keyMarshaller, dataOut);
+        }
+    }
+
+    void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+        readHeader(dataIn);
+        hashIndexEntries.clear();
+        for (int i = 0; i < persistedSize; i++) {
+            HashEntry entry = new HashEntry();
+            entry.read(keyMarshaller, dataIn);
+            hashIndexEntries.add(entry);
+        }
+    }
+
+    void readHeader(DataInput dataIn) throws IOException {
+        binId = dataIn.readInt();
+        txId = dataIn.readLong();
+        persistedSize = dataIn.readInt();
+    }
+
+    boolean isEmpty() {
+        return hashIndexEntries.isEmpty();
+    }
+
+    boolean isFull() {
+        return hashIndexEntries.size() >= getMaximumEntries();
+    }
+
+    boolean isUnderflowed() {
+        return hashIndexEntries.size() < (getMaximumEntries() / 2);
+    }
+
+    boolean isOverflowed() {
+        return hashIndexEntries.size() > getMaximumEntries();
+    }
+
+    List<HashEntry> getEntries() {
+        return hashIndexEntries;
+    }
+
+    void setEntries(List<HashEntry> newEntries) {
+        this.hashIndexEntries = newEntries;
+    }
+
+    int getMaximumEntries() {
+        return index.getKeysPerPage();
+    }
+
+    int size() {
+        return hashIndexEntries.size();
+    }
+
+    void reset() {
+        hashIndexEntries.clear();
+        persistedSize=0;
+    }
+
+    void addHashEntry(int index, HashEntry entry) throws IOException {
+        hashIndexEntries.add(index, entry);
+    }
+
+    HashEntry getHashEntry(int index) {
+        HashEntry result = hashIndexEntries.get(index);
+        return result;
+    }
+
+    HashEntry removeHashEntry(int index) throws IOException {
+        HashEntry result = hashIndexEntries.remove(index);
+        return result;
+    }
+
+    void removeAllTreeEntries(List<HashEntry> c) {
+        hashIndexEntries.removeAll(c);
+    }
+
+    List<HashEntry> getSubList(int from, int to) {
+        return new ArrayList<HashEntry>(hashIndexEntries.subList(from, to));
+    }
+
+    /**
+     * @return the binId
+     */
+    int getBinId() {
+        return this.binId;
+    }
+
+    /**
+     * @param binId the binId to set
+     */
+    void setBinId(int binId) {
+        this.binId = binId;
+    }
+
+    String dump() {
+
+        StringBuffer str = new StringBuffer(32);
+        str.append(toString());
+        str.append(": ");
+        for (HashEntry entry : hashIndexEntries) {
+            str.append(entry);
+            str.append(",");
+        }
+        return str.toString();
+    }
+
+    public long getTxId() {
+        return txId;
+    }
+
+    public void setTxId(long txId) {
+        this.txId = txId;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import java.io.IOException;
+
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * A Page within a HashPageInfo
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashPageInfo extends LinkedNode{
+
+    private HashIndex hashIndex;
+    private long id;
+    private int size;
+    private HashPage page;
+    private boolean dirty;
+
+    HashPageInfo(HashIndex index) {
+        this.hashIndex = index;
+    }
+
+    /**
+     * @return the id
+     */
+    long getId() {
+        return this.id;
+    }
+
+    /**
+     * @param id the id to set
+     */
+    void setId(long id) {
+        this.id = id;
+    }
+
+    /**
+     * @return the size
+     */
+    int size() {
+        return this.size;
+    }
+    
+    boolean isEmpty() {
+        return size <= 0;
+    }
+
+    /**
+     * @param size the size to set
+     */
+    void setSize(int size) {
+        this.size = size;
+    }
+
+    void addHashEntry(int index, HashEntry entry) throws IOException {
+        page.addHashEntry(index, entry);
+        size=page.size();
+        dirty = true;
+    }
+
+    HashEntry getHashEntry(int index) throws IOException {
+        return page.getHashEntry(index);
+    }
+
+    HashEntry removeHashEntry(int index) throws IOException {
+        HashEntry result = page.removeHashEntry(index);
+        if (result != null) {
+            size=page.size();
+            dirty = true;
+        }
+        return result;
+    }
+
+    String dump() {
+        return page.dump();
+    }
+
+    void begin() throws IOException {
+        if (page == null) {
+            page = hashIndex.lookupPage(id);
+        }
+    }
+
+    void end() throws IOException {
+        if (page != null) {
+            if (dirty) {
+                hashIndex.writeFullPage(page);
+            }
+        }
+        page = null;
+        dirty = false;
+    }
+
+    HashPage getPage() {
+        return page;
+    }
+
+    void setPage(HashPage page) {
+        this.page = page;
+    }
+    
+    public String toString() {
+        return "Page["+id+"] size=" + size;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/package.html?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/package.html Mon Aug 11 10:46:37 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+disk based Hash implementation of an index for a Map
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import java.io.File;
+
+import org.apache.kahadb.Store;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexBenchmark;
+import org.apache.kahadb.impl.index.robusthash.HashIndex;
+
+public class HashIndexBenchMark extends IndexBenchmark {
+
+    @Override
+    protected Index createIndex(File root, String name) throws Exception {
+        HashIndex index = new HashIndex(root, name, indexManager);
+        //index.setNumberOfBins(12);
+        //index.setPageSize(32 * 1024);
+        index.setKeyMarshaller(Store.STRING_MARSHALLER);
+        return index;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashTest.java?rev=684838&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashTest.java Mon Aug 11 10:46:37 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.robusthash;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import junit.framework.TestCase;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.impl.index.robusthash.HashIndex;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Test a HashIndex
+ */
+public class HashTest extends TestCase {
+
+    private static final int COUNT = 10000;
+
+    private HashIndex hashIndex;
+
+    private File directory;
+
+    private IndexManager indexManager;
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        directory = new File(IOHelper.getDefaultDataDirectory());
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        indexManager = new IndexManager(directory, "im-hash-test", "rw", null,
+                new AtomicLong());
+        this.hashIndex = new HashIndex(directory, "testHash", indexManager);
+        this.hashIndex.setNumberOfBins(12);
+        this.hashIndex.setPageSize(32 * 1024);
+        this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
+    }
+
+    public void testHashIndex() throws Exception {
+        doTest(300);
+        hashIndex.clear();
+        hashIndex.unload();
+        doTest(600);
+        hashIndex.clear();
+        hashIndex.unload();
+        doTest(128);
+    }
+
+    public void doTest(int pageSize) throws Exception {
+        String keyRoot = "key:";
+        hashIndex.setPageSize(pageSize);
+        this.hashIndex.load();
+        doInsert(keyRoot);
+        this.hashIndex.unload();
+        this.hashIndex.load();
+        checkRetrieve(keyRoot);
+        doRemove(keyRoot);
+        this.hashIndex.unload();
+        this.hashIndex.load();
+        doInsert(keyRoot);
+        doRemoveHalf(keyRoot);
+        doInsertHalf(keyRoot);
+        this.hashIndex.unload();
+        this.hashIndex.load();
+        checkRetrieve(keyRoot);
+        this.hashIndex.unload();
+    }
+
+    void doInsert(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem value = indexManager.createNewIndex();
+            indexManager.storeIndex(value);
+            hashIndex.store(keyRoot + i, value);
+        }
+    }
+
+    void checkRetrieve(String keyRoot) throws IOException {
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+            assertNotNull("Key missing: "+keyRoot + i, item);
+        }
+    }
+
+    void doRemoveHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                hashIndex.remove(keyRoot + i);
+            }
+
+        }
+    }
+
+    void doInsertHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                IndexItem value = indexManager.createNewIndex();
+                indexManager.storeIndex(value);
+                hashIndex.store(keyRoot + i, value);
+            }
+        }
+    }
+
+    void doRemove(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            hashIndex.remove(keyRoot + i);
+        }
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+            assertNull(item);
+        }
+    }
+
+    void doRemoveBackwards(String keyRoot) throws Exception {
+        for (int i = COUNT - 1; i >= 0; i--) {
+            hashIndex.remove(keyRoot + i);
+        }
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+            assertNull(item);
+        }
+    }
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        File[] files = directory.listFiles();
+        for (File file : files) {
+            file.delete();
+        }
+    }
+}



Mime
View raw message