activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r677944 [6/11] - in /activemq/sandbox/kahadb: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/impl/ src/main/java/org/apache/kahadb/impl/async/ s...
Date Fri, 18 Jul 2008 15:49:52 GMT
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndex.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndex.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,620 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.hash;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LRUCache;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class HashIndex implements Index, HashIndexMBean {
+    public static final int DEFAULT_PAGE_SIZE;
+    public static final int DEFAULT_KEY_SIZE;
+    public static final int DEFAULT_BIN_SIZE;
+    public static final int MAXIMUM_CAPACITY;
+    public static final int DEFAULT_LOAD_FACTOR;
+    private static final int LOW_WATER_MARK=1024*16;
+    private static final String NAME_PREFIX = "hash-index-";
+    private static final Log LOG = LogFactory.getLog(HashIndex.class);
+    private final String name;
+    private File directory;
+    private File file;
+    private RandomAccessFile indexFile;
+    private IndexManager indexManager;
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private int keySize = DEFAULT_KEY_SIZE;
+    private int numberOfBins = DEFAULT_BIN_SIZE;
+    private int keysPerPage = this.pageSize /this.keySize;
+    private DataByteArrayInputStream dataIn;
+    private DataByteArrayOutputStream dataOut;
+    private byte[] readBuffer;
+    private HashBin[] bins;
+    private Marshaller keyMarshaller;
+    private long length;
+    private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
+    private AtomicBoolean loaded = new AtomicBoolean();
+    private LRUCache<Long, HashPage> pageCache;
+    private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
+    private int pageCacheSize = 10;
+    private int size;
+    private int highestSize=0;
+    private int activeBins;
+    private int threshold;
+    private int maximumCapacity=MAXIMUM_CAPACITY;
+    private int loadFactor=DEFAULT_LOAD_FACTOR;
+    
+    
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @param numberOfBins
+     * @throws IOException
+     */
+    public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
+        this.directory = directory;
+        this.name = name;
+        this.indexManager = indexManager;
+        openIndexFile();
+        pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public synchronized void setKeyMarshaller(Marshaller marshaller) {
+        this.keyMarshaller = marshaller;
+    }
+
+    /**
+     * @return the keySize
+     */
+    public synchronized int getKeySize() {
+        return this.keySize;
+    }
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public synchronized void setKeySize(int keySize) {
+        this.keySize = keySize;
+        if (loaded.get()) {
+            throw new RuntimeException("Pages already loaded - can't reset key size");
+        }
+    }
+
+    /**
+     * @return the pageSize
+     */
+    public synchronized int getPageSize() {
+        return this.pageSize;
+    }
+
+    /**
+     * @param pageSize the pageSize to set
+     */
+    public synchronized void setPageSize(int pageSize) {
+        if (loaded.get() && pageSize != this.pageSize) {
+            throw new RuntimeException("Pages already loaded - can't reset page size");
+        }
+        this.pageSize = pageSize;
+    }
+    
+    /**
+     * @return number of bins
+     */
+    public int getNumberOfBins() {
+        return this.numberOfBins;
+    }
+
+    /**
+     * @param numberOfBins
+     */
+    public void setNumberOfBins(int numberOfBins) {
+        if (loaded.get() && numberOfBins != this.numberOfBins) {
+            throw new RuntimeException("Pages already loaded - can't reset bin size");
+        }
+        this.numberOfBins = numberOfBins;
+    }
+
+    /**
+     * @return the enablePageCaching
+     */
+    public synchronized boolean isEnablePageCaching() {
+        return this.enablePageCaching;
+    }
+
+    /**
+     * @param enablePageCaching the enablePageCaching to set
+     */
+    public synchronized void setEnablePageCaching(boolean enablePageCaching) {
+        this.enablePageCaching = enablePageCaching;
+    }
+
+    /**
+     * @return the pageCacheSize
+     */
+    public synchronized int getPageCacheSize() {
+        return this.pageCacheSize;
+    }
+
+    /**
+     * @param pageCacheSize the pageCacheSize to set
+     */
+    public synchronized void setPageCacheSize(int pageCacheSize) {
+        this.pageCacheSize = pageCacheSize;
+        pageCache.setMaxCacheSize(pageCacheSize);
+    }
+
+    public synchronized boolean isTransient() {
+        return false;
+    }
+    
+    /**
+     * @return the threshold
+     */
+    public int getThreshold() {
+        return threshold;
+    }
+
+    /**
+     * @param threshold the threshold to set
+     */
+    public void setThreshold(int threshold) {
+        this.threshold = threshold;
+    }
+
+    /**
+     * @return the loadFactor
+     */
+    public int getLoadFactor() {
+        return loadFactor;
+    }
+
+    /**
+     * @param loadFactor the loadFactor to set
+     */
+    public void setLoadFactor(int loadFactor) {
+        this.loadFactor = loadFactor;
+    }
+    
+    /**
+     * @return the maximumCapacity
+     */
+    public int getMaximumCapacity() {
+        return maximumCapacity;
+    }
+
+    /**
+     * @param maximumCapacity the maximumCapacity to set
+     */
+    public void setMaximumCapacity(int maximumCapacity) {
+        this.maximumCapacity = maximumCapacity;
+    }
+    
+    public synchronized int getSize() {
+        return size;
+    }
+    
+    public synchronized int getActiveBins(){
+        return activeBins;
+    }
+
+    public synchronized void load() {
+        if (loaded.compareAndSet(false, true)) {
+            int capacity = 1;
+            while (capacity < numberOfBins) {
+                capacity <<= 1;
+            }
+            this.bins = new HashBin[capacity];
+            threshold = calculateThreashold();
+            keysPerPage = pageSize / keySize;
+            dataIn = new DataByteArrayInputStream();
+            dataOut = new DataByteArrayOutputStream(pageSize);
+            readBuffer = new byte[pageSize];
+            try {
+                openIndexFile();
+                if (indexFile.length() > 0) {
+                    doCompress();
+                }
+            } catch (IOException e) {
+                LOG.error("Failed to load index ", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }    
+
+    public synchronized void unload() throws IOException {
+        if (loaded.compareAndSet(true, false)) {
+            if (indexFile != null) {
+                indexFile.close();
+                indexFile = null;
+                freeList.clear();
+                pageCache.clear();
+                bins = new HashBin[bins.length];
+            }
+        }
+    }
+
+    public synchronized void store(Object key, StoreEntry value) throws IOException {
+        load();
+        HashEntry entry = new HashEntry();
+        entry.setKey((Comparable)key);
+        entry.setIndexOffset(value.getOffset());
+        if (!getBin(key).put(entry)) {
+            this.size++;
+        }
+        if (this.size >= this.threshold) {
+            resize(2*bins.length);
+        }
+        if(this.size > this.highestSize) {
+            this.highestSize=this.size;
+        }
+    }
+
+    public synchronized StoreEntry get(Object key) throws IOException {
+        load();
+        HashEntry entry = new HashEntry();
+        entry.setKey((Comparable)key);
+        HashEntry result = getBin(key).find(entry);
+        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
+    }
+
+    public synchronized StoreEntry remove(Object key) throws IOException {
+        load();
+        StoreEntry result = null;
+        HashEntry entry = new HashEntry();
+        entry.setKey((Comparable)key);
+        HashEntry he = getBin(key).remove(entry);
+        if (he != null) {
+            this.size--;
+            result = this.indexManager.getIndex(he.getIndexOffset());
+        }
+        if (this.highestSize > LOW_WATER_MARK &&  this.highestSize > (this.size *2)) {
+            int newSize = this.size/this.keysPerPage;
+            newSize = Math.max(128, newSize);
+            this.highestSize=0;
+            resize(newSize);
+            
+        }
+        return result;
+    }
+
+    public synchronized boolean containsKey(Object key) throws IOException {
+        return get(key) != null;
+    }
+
+    public synchronized void clear() throws IOException {
+        unload();
+        delete();
+        openIndexFile();
+        load();
+    }
+
+    public synchronized void delete() throws IOException {
+        unload();
+        if (file.exists()) {
+            file.delete();
+        }
+        length = 0;
+    }
+
+    HashPage lookupPage(long pageId) throws IOException {
+        HashPage result = null;
+        if (pageId >= 0) {
+            result = getFromCache(pageId);
+            if (result == null) {
+                result = getFullPage(pageId);
+                if (result != null) {
+                    if (result.isActive()) {
+                        addToCache(result);
+                    } else {
+                        throw new IllegalStateException("Trying to access an inactive page: " + pageId);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    HashPage createPage(int binId) throws IOException {
+        HashPage result = getNextFreePage();
+        if (result == null) {  
+            // allocate one
+            result = new HashPage(keysPerPage);
+            result.setId(length);
+            result.setBinId(binId);
+            writePageHeader(result);
+            length += pageSize;
+            indexFile.seek(length);
+            indexFile.write(HashEntry.NOT_SET);
+        }
+        addToCache(result);
+        return result;
+    }
+
+    void releasePage(HashPage page) throws IOException {
+        removeFromCache(page);
+        page.reset();
+        page.setActive(false);
+        writePageHeader(page);
+        freeList.add(page);
+    }
+
+    private HashPage getNextFreePage() throws IOException {
+        HashPage result = null;
+        if(!freeList.isEmpty()) {
+            result = freeList.removeFirst();
+            result.setActive(true);
+            result.reset();
+            writePageHeader(result);
+        }
+        return result;
+    }
+
+    void writeFullPage(HashPage page) throws IOException {
+        dataOut.reset();
+        page.write(keyMarshaller, dataOut);
+        if (dataOut.size() > pageSize) {
+            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+        }
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(), 0, dataOut.size());
+    }
+
+    void writePageHeader(HashPage page) throws IOException {
+        dataOut.reset();
+        page.writeHeader(dataOut);
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE);
+    }
+
+    HashPage getFullPage(long id) throws IOException {
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer, 0, pageSize);
+        dataIn.restart(readBuffer);
+        HashPage page = new HashPage(keysPerPage);
+        page.setId(id);
+        page.read(keyMarshaller, dataIn);
+        return page;
+    }
+
+    HashPage getPageHeader(long id) throws IOException {
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
+        dataIn.restart(readBuffer);
+        HashPage page = new HashPage(keysPerPage);
+        page.setId(id);
+        page.readHeader(dataIn);
+        return page;
+    }
+
+    void addToBin(HashPage page) throws IOException {
+        int index = page.getBinId();
+        if (index >= numberOfBins) {
+            HashBin[] newBins = new HashBin[index+1];
+            System.arraycopy(this.bins, 0, newBins, 0, this.bins.length);
+            this.bins=newBins;
+        }
+        HashBin bin = getBin(index);
+        bin.addHashPageInfo(page.getId(), page.getPersistedSize());
+    }
+
+    private HashBin getBin(int index) {
+        
+        HashBin result = bins[index];
+        if (result == null) {
+            result = new HashBin(this, index, pageSize / keySize);
+            bins[index] = result;
+            activeBins++;
+        }
+        return result;
+    }
+
+    private void openIndexFile() throws IOException {
+        if (indexFile == null) {
+            file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
+            IOHelper.mkdirs(file.getParentFile());
+            indexFile = new RandomAccessFile(file, "rw");
+        }
+    }
+    
+    private HashBin getBin(Object key) {
+        int hash = hash(key);
+        int i = indexFor(hash, bins.length);
+        return getBin(i);
+    }
+
+    private HashPage getFromCache(long pageId) {
+        HashPage result = null;
+        if (enablePageCaching) {
+            result = pageCache.get(pageId);
+        }
+        return result;
+    }
+
+    private void addToCache(HashPage page) {
+        if (enablePageCaching) {
+            pageCache.put(page.getId(), page);
+        }
+    }
+
+    private void removeFromCache(HashPage page) {
+        if (enablePageCaching) {
+            pageCache.remove(page.getId());
+        }
+    }
+    
+    private void doLoad() throws IOException {
+        long offset = 0;
+        if (loaded.compareAndSet(false, true)) {
+            while ((offset + pageSize) <= indexFile.length()) {
+                indexFile.seek(offset);
+                indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
+                dataIn.restart(readBuffer);
+                HashPage page = new HashPage(keysPerPage);
+                page.setId(offset);
+                page.readHeader(dataIn);
+                if (!page.isActive()) {
+                    page.reset();
+                    freeList.add(page);
+                } else {
+                    addToBin(page);
+                    size+=page.size();
+                }
+                offset += pageSize;
+            }
+            length=offset;
+        }
+    }
+    
+    private void doCompress() throws IOException {
+        String backFileName = name + "-COMPRESS";
+        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+        backIndex.setKeyMarshaller(keyMarshaller);
+        backIndex.setKeySize(getKeySize());
+        backIndex.setNumberOfBins(getNumberOfBins());
+        backIndex.setPageSize(getPageSize());
+        backIndex.load();
+        File backFile = backIndex.file;
+        long offset = 0;
+        while ((offset + pageSize) <= indexFile.length()) {
+            indexFile.seek(offset);
+            HashPage page = getFullPage(offset);
+            if (page.isActive()) {
+                for (HashEntry entry : page.getEntries()) {
+                    backIndex.getBin(entry.getKey()).put(entry);
+                    backIndex.size++;
+                }
+            }
+            page=null;
+            offset += pageSize;
+        }
+        backIndex.unload();
+      
+        unload();
+        IOHelper.deleteFile(file);
+        IOHelper.copyFile(backFile, file);
+        IOHelper.deleteFile(backFile);
+        openIndexFile();
+        doLoad();
+    }
+    
+    private void resize(int newCapacity) throws IOException {
+        if (bins.length < getMaximumCapacity()) {
+            if (newCapacity != numberOfBins) {
+                int capacity = 1;
+                while (capacity < newCapacity) {
+                    capacity <<= 1;
+                }
+                newCapacity=capacity;
+                if (newCapacity != numberOfBins) {
+                    LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
+                    
+                    String backFileName = name + "-REISZE";
+                    HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+                    backIndex.setKeyMarshaller(keyMarshaller);
+                    backIndex.setKeySize(getKeySize());
+                    backIndex.setNumberOfBins(newCapacity);
+                    backIndex.setPageSize(getPageSize());
+                    backIndex.load();
+                    File backFile = backIndex.file;
+                    long offset = 0;
+                    while ((offset + pageSize) <= indexFile.length()) {
+                        indexFile.seek(offset);
+                        HashPage page = getFullPage(offset);
+                        if (page.isActive()) {
+                            for (HashEntry entry : page.getEntries()) {
+                                backIndex.getBin(entry.getKey()).put(entry);
+                                backIndex.size++;
+                            }
+                        }
+                        page=null;
+                        offset += pageSize;
+                    }
+                    backIndex.unload();
+                  
+                    unload();
+                    IOHelper.deleteFile(file);
+                    IOHelper.copyFile(backFile, file);
+                    IOHelper.deleteFile(backFile);
+                    setNumberOfBins(newCapacity);
+                    bins = new HashBin[newCapacity];
+                    threshold = calculateThreashold();
+                    openIndexFile();
+                    doLoad();
+                }
+            }
+        }else {
+            threshold = Integer.MAX_VALUE;
+            return;
+        }
+    }
+    
+    private int calculateThreashold() {
+        return (int)(bins.length * loadFactor);
+    }
+    
+    
+    public String toString() {
+        String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
+        return str;
+    }
+      
+
+    static int hash(Object x) {
+        int h = x.hashCode();
+        h += ~(h << 9);
+        h ^= h >>> 14;
+        h += h << 4;
+        h ^= h >>> 10;
+        return h;
+    }
+
+    static int indexFor(int h, int length) {
+        return h & (length - 1);
+    }
+
+    static {
+        DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
+        DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
+        DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+        MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
+        DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndexMBean.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndexMBean.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndexMBean.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashIndexMBean.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.hash;
+
+import org.apache.kahadb.IndexMBean;
+
+/**
+ * MBean for HashIndex
+ *
+ */
+public interface HashIndexMBean extends IndexMBean{
+   
+    /**
+     * @return the keySize
+     */
+    public int getKeySize();
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public void setKeySize(int keySize);
+
+    
+    /**
+     * @return the page size
+     */
+    public int getPageSize();
+
+        
+    /**
+     * @return number of bins
+     */
+    public int getNumberOfBins();
+
+
+    /**
+     * @return the enablePageCaching
+     */
+    public boolean isEnablePageCaching();
+
+    
+    /**
+     * @return the pageCacheSize
+     */
+    public int getPageCacheSize();
+
+    /**
+     * @return size
+     */
+    public int getSize();
+    
+    /**
+     * @return the number of active bins
+     */
+    public int getActiveBins();
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPage.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPage.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPage.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.hash;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+
+/**
+ * A Page within a HashPage
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashPage {
+    static final int PAGE_HEADER_SIZE = 17;
+    private static final transient Log LOG = LogFactory.getLog(HashPage.class);
+
+    private int maximumEntries;
+    private long id;
+    private int binId;
+    private List<HashEntry> hashIndexEntries;
+    private int persistedSize;
+    /*
+     * for persistence only
+     */
+    private long nextFreePageId = HashEntry.NOT_SET;
+    private boolean active = true;
+
+    
+    /**
+     * Constructor
+     * 
+     * @param maximumEntries
+     */
+    public HashPage(int maximumEntries) {
+        this.maximumEntries = maximumEntries;
+        this.hashIndexEntries = new ArrayList<HashEntry>(maximumEntries);
+    }
+
+    public String toString() {
+        return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + persistedSize;
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof HashPage) {
+            HashPage other = (HashPage)o;
+            result = other.id == id;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return (int)id;
+    }
+
+    boolean isActive() {
+        return this.active;
+    }
+
+    void setActive(boolean active) {
+        this.active = active;
+    }
+
+    
+    long getId() {
+        return id;
+    }
+
+    void setId(long id) {
+        this.id = id;
+    }
+
+    int getPersistedSize() {
+        return persistedSize;
+    }
+
+    void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+        persistedSize=hashIndexEntries.size();
+        writeHeader(dataOut);
+        dataOut.writeInt(persistedSize);
+        for (HashEntry entry : hashIndexEntries) {
+            entry.write(keyMarshaller, dataOut);
+        }
+    }
+
+    void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+        readHeader(dataIn);
+        dataIn.readInt();
+        int size = persistedSize;
+        hashIndexEntries.clear();
+        for (int i = 0; i < size; i++) {
+            HashEntry entry = new HashEntry();
+            entry.read(keyMarshaller, dataIn);
+            hashIndexEntries.add(entry);
+        }
+    }
+
+    void readHeader(DataInput dataIn) throws IOException {
+        active = dataIn.readBoolean();
+        nextFreePageId = dataIn.readLong();
+        binId = dataIn.readInt();
+        persistedSize = dataIn.readInt();
+    }
+
+    void writeHeader(DataOutput dataOut) throws IOException {
+        dataOut.writeBoolean(isActive());
+        dataOut.writeLong(nextFreePageId);
+        dataOut.writeInt(binId);
+        persistedSize=hashIndexEntries.size();
+        dataOut.writeInt(persistedSize);
+    }
+    
+
+    boolean isEmpty() {
+        return hashIndexEntries.isEmpty();
+    }
+
+    boolean isFull() {
+        return hashIndexEntries.size() >= maximumEntries;
+    }
+
+    boolean isUnderflowed() {
+        return hashIndexEntries.size() < (maximumEntries / 2);
+    }
+
+    boolean isOverflowed() {
+        return hashIndexEntries.size() > maximumEntries;
+    }
+
+    List<HashEntry> getEntries() {
+        return hashIndexEntries;
+    }
+
+    void setEntries(List<HashEntry> newEntries) {
+        this.hashIndexEntries = newEntries;
+    }
+
+    int getMaximumEntries() {
+        return this.maximumEntries;
+    }
+
+    void setMaximumEntries(int maximumEntries) {
+        this.maximumEntries = maximumEntries;
+    }
+
+    int size() {
+        return hashIndexEntries.size();
+    }
+
+    void reset() throws IOException {
+        hashIndexEntries.clear();
+        persistedSize=0;
+    }
+
+    void addHashEntry(int index, HashEntry entry) throws IOException {
+        hashIndexEntries.add(index, entry);
+    }
+
+    HashEntry getHashEntry(int index) {
+        HashEntry result = hashIndexEntries.get(index);
+        return result;
+    }
+
+    HashEntry removeHashEntry(int index) throws IOException {
+        HashEntry result = hashIndexEntries.remove(index);
+        return result;
+    }
+
+    void removeAllTreeEntries(List<HashEntry> c) {
+        hashIndexEntries.removeAll(c);
+    }
+
+    List<HashEntry> getSubList(int from, int to) {
+        return new ArrayList<HashEntry>(hashIndexEntries.subList(from, to));
+    }
+
+    /**
+     * @return the binId
+     */
+    int getBinId() {
+        return this.binId;
+    }
+
+    /**
+     * @param binId the binId to set
+     */
+    void setBinId(int binId) {
+        this.binId = binId;
+    }
+
+    String dump() {
+
+        StringBuffer str = new StringBuffer(32);
+        str.append(toString());
+        str.append(": ");
+        for (HashEntry entry : hashIndexEntries) {
+            str.append(entry);
+            str.append(",");
+        }
+        return str.toString();
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPageInfo.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPageInfo.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/HashPageInfo.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.hash;
+
+import java.io.IOException;
+
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * A Page within a HashPageInfo
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashPageInfo extends LinkedNode{
+
+    private HashIndex hashIndex;
+    private long id;
+    private int size;
+    private HashPage page;
+    private boolean dirty;
+
+    HashPageInfo(HashIndex index) {
+        this.hashIndex = index;
+    }
+
+    /**
+     * @return the id
+     */
+    long getId() {
+        return this.id;
+    }
+
+    /**
+     * @param id the id to set
+     */
+    void setId(long id) {
+        this.id = id;
+    }
+
+    /**
+     * @return the size
+     */
+    int size() {
+        return this.size;
+    }
+    
+    boolean isEmpty() {
+        return size <= 0;
+    }
+
+    /**
+     * @param size the size to set
+     */
+    void setSize(int size) {
+        this.size = size;
+    }
+
+    void addHashEntry(int index, HashEntry entry) throws IOException {
+        page.addHashEntry(index, entry);
+        size=page.size();
+        dirty = true;
+    }
+
+    HashEntry getHashEntry(int index) throws IOException {
+        return page.getHashEntry(index);
+    }
+
+    HashEntry removeHashEntry(int index) throws IOException {
+        HashEntry result = page.removeHashEntry(index);
+        if (result != null) {
+            size=page.size();
+            dirty = true;
+        }
+        return result;
+    }
+
+    String dump() {
+        return page.dump();
+    }
+
+    void begin() throws IOException {
+        if (page == null) {
+            page = hashIndex.lookupPage(id);
+        }
+    }
+
+    void end() throws IOException {
+        if (page != null) {
+            if (dirty) {
+                hashIndex.writeFullPage(page);
+            }
+        }
+        page = null;
+        dirty = false;
+    }
+
+    HashPage getPage() {
+        return page;
+    }
+
+    void setPage(HashPage page) {
+        this.page = page;
+    }
+    
+    public String toString() {
+        return "Page["+id+"] size=" + size;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+disk based Hash implementation of an index for a Map
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ Kaha index - type classes for the Map Container - including VM  implementation
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeEntry.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeEntry.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeEntry.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.tree;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.kahadb.Marshaller;
+
+/**
+ * Key and index for a BTree
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class TreeEntry implements Comparable {
+
+    static final int NOT_SET = -1;
+    private Comparable key;
+    private long indexOffset;
+    private long prevPageId = NOT_SET;
+    private long nextPageId = NOT_SET;
+
+    public int compareTo(Object o) {
+        if (o instanceof TreeEntry) {
+            TreeEntry other = (TreeEntry)o;
+            return key.compareTo(other.key);
+        } else {
+            return key.compareTo(o);
+        }
+    }
+
+    public boolean equals(Object o) {
+        return compareTo(o) == 0;
+    }
+
+    public int hashCode() {
+        return key.hashCode();
+    }
+
+    public String toString() {
+        return "TreeEntry(" + key + "," + indexOffset + ")prev=" + prevPageId + ",next=" + nextPageId;
+    }
+
+    void reset() {
+        prevPageId = NOT_SET;
+        nextPageId = NOT_SET;
+    }
+
+    TreeEntry copy() {
+        TreeEntry copy = new TreeEntry();
+        copy.key = this.key;
+        copy.indexOffset = this.indexOffset;
+        copy.prevPageId = this.prevPageId;
+        copy.nextPageId = this.nextPageId;
+        return copy;
+    }
+
+    /**
+     * @return the key
+     */
+    Comparable getKey() {
+        return this.key;
+    }
+
+    /**
+     * @param key the key to set
+     */
+    void setKey(Comparable key) {
+        this.key = key;
+    }
+
+    /**
+     * @return the nextPageId
+     */
+    long getNextPageId() {
+        return this.nextPageId;
+    }
+
+    /**
+     * @param nextPageId the nextPageId to set
+     */
+    void setNextPageId(long nextPageId) {
+        this.nextPageId = nextPageId;
+    }
+
+    /**
+     * @return the prevPageId
+     */
+    long getPrevPageId() {
+        return this.prevPageId;
+    }
+
+    /**
+     * @param prevPageId the prevPageId to set
+     */
+    void setPrevPageId(long prevPageId) {
+        this.prevPageId = prevPageId;
+    }
+
+    /**
+     * @return the indexOffset
+     */
+    long getIndexOffset() {
+        return this.indexOffset;
+    }
+
+    /**
+     * @param indexOffset the indexOffset to set
+     */
+    void setIndexOffset(long indexOffset) {
+        this.indexOffset = indexOffset;
+    }
+
+    boolean hasChildPagesReferences() {
+        return prevPageId != NOT_SET || nextPageId != NOT_SET;
+    }
+
+    void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+        keyMarshaller.writePayload(key, dataOut);
+        dataOut.writeLong(indexOffset);
+        dataOut.writeLong(nextPageId);
+        dataOut.writeLong(prevPageId);
+    }
+
+    void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+        key = (Comparable)keyMarshaller.readPayload(dataIn);
+        indexOffset = dataIn.readLong();
+        nextPageId = dataIn.readLong();
+        prevPageId = dataIn.readLong();
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeIndex.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreeIndex.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.tree;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LRUCache;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class TreeIndex implements Index {
+
+    private static final String NAME_PREFIX = "tree-index-";
+    private static final int DEFAULT_PAGE_SIZE;
+    private static final int DEFAULT_KEY_SIZE;
+    private static final Log LOG = LogFactory.getLog(TreeIndex.class);
+    private final String name;
+    private File directory;
+    private File file;
+    private RandomAccessFile indexFile;
+    private IndexManager indexManager;
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private int keySize = DEFAULT_KEY_SIZE;
+    private int keysPerPage = pageSize / keySize;
+    private TreePage root;
+    private LRUCache<Long, TreePage> pageCache;
+    private DataByteArrayInputStream dataIn;
+    private DataByteArrayOutputStream dataOut;
+    private byte[] readBuffer;
+    private Marshaller keyMarshaller;
+    private long length;
+    private TreePage firstFree;
+    private TreePage lastFree;
+    private AtomicBoolean loaded = new AtomicBoolean();
+    private boolean enablePageCaching = true;
+    private int pageCacheSize = 10;
+
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @throws IOException
+     */
+    public TreeIndex(File directory, String name, IndexManager indexManager) throws IOException {
+        this.directory = directory;
+        this.name = name;
+        this.indexManager = indexManager;
+        pageCache = new LRUCache<Long, TreePage>(pageCacheSize, pageCacheSize, 0.75f, true);
+        openIndexFile();
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public void setKeyMarshaller(Marshaller marshaller) {
+        this.keyMarshaller = marshaller;
+    }
+
+    /**
+     * @return the keySize
+     */
+    public int getKeySize() {
+        return this.keySize;
+    }
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public void setKeySize(int keySize) {
+        this.keySize = keySize;
+        if (loaded.get()) {
+            throw new RuntimeException("Pages already loaded - can't reset key size");
+        }
+    }
+
+    /**
+     * @return the pageSize
+     */
+    public int getPageSize() {
+        return this.pageSize;
+    }
+
+    /**
+     * @param pageSize the pageSize to set
+     */
+    public void setPageSize(int pageSize) {
+        if (loaded.get() && pageSize != this.pageSize) {
+            throw new RuntimeException("Pages already loaded - can't reset page size");
+        }
+        this.pageSize = pageSize;
+    }
+
+    public boolean isTransient() {
+        return false;
+    }
+
+    /**
+     * @return the enablePageCaching
+     */
+    public boolean isEnablePageCaching() {
+        return this.enablePageCaching;
+    }
+
+    /**
+     * @param enablePageCaching the enablePageCaching to set
+     */
+    public void setEnablePageCaching(boolean enablePageCaching) {
+        this.enablePageCaching = enablePageCaching;
+    }
+
+    /**
+     * @return the pageCacheSize
+     */
+    public int getPageCacheSize() {
+        return this.pageCacheSize;
+    }
+
+    /**
+     * @param pageCacheSize the pageCacheSize to set
+     */
+    public void setPageCacheSize(int pageCacheSize) {
+        this.pageCacheSize = pageCacheSize;
+        pageCache.setMaxCacheSize(pageCacheSize);
+    }
+
+    public void load() {
+        if (loaded.compareAndSet(false, true)) {
+            keysPerPage = pageSize / keySize;
+            dataIn = new DataByteArrayInputStream();
+            dataOut = new DataByteArrayOutputStream(pageSize);
+            readBuffer = new byte[pageSize];
+            try {
+                openIndexFile();
+                long offset = 0;
+                while ((offset + pageSize) <= indexFile.length()) {
+                    indexFile.seek(offset);
+                    indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
+                    dataIn.restart(readBuffer);
+                    TreePage page = new TreePage(keysPerPage);
+                    page.setTree(this);
+                    page.setId(offset);
+                    page.readHeader(dataIn);
+                    if (!page.isActive()) {
+                        if (lastFree != null) {
+                            lastFree.setNextFreePageId(offset);
+                            indexFile.seek(lastFree.getId());
+                            dataOut.reset();
+                            lastFree.writeHeader(dataOut);
+                            indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
+                            lastFree = page;
+                        } else {
+                            lastFree = page;
+                            firstFree = page;
+                        }
+                    } else if (root == null && page.isRoot()) {
+                        root = getFullPage(offset);
+                    }
+                    offset += pageSize;
+                }
+                length = offset;
+                if (root == null) {
+                    root = createRoot();
+                }
+            } catch (IOException e) {
+                LOG.error("Failed to load index ", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void unload() throws IOException {
+        if (loaded.compareAndSet(true, false)) {
+            if (indexFile != null) {
+                indexFile.close();
+                indexFile = null;
+                pageCache.clear();
+                root = null;
+                firstFree = null;
+                lastFree = null;
+            }
+        }
+    }
+
+    public void store(Object key, StoreEntry value) throws IOException {
+        TreeEntry entry = new TreeEntry();
+        entry.setKey((Comparable)key);
+        entry.setIndexOffset(value.getOffset());
+        root.put(entry);
+    }
+
+    public StoreEntry get(Object key) throws IOException {
+        TreeEntry entry = new TreeEntry();
+        entry.setKey((Comparable)key);
+        TreeEntry result = root.find(entry);
+        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
+    }
+
+    public StoreEntry remove(Object key) throws IOException {
+        TreeEntry entry = new TreeEntry();
+        entry.setKey((Comparable)key);
+        TreeEntry result = root.remove(entry);
+        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
+    }
+
+    public boolean containsKey(Object key) throws IOException {
+        TreeEntry entry = new TreeEntry();
+        entry.setKey((Comparable)key);
+        return root.find(entry) != null;
+    }
+
+    public void clear() throws IOException {
+        unload();
+        delete();
+        openIndexFile();
+        load();
+    }
+
+    public void delete() throws IOException {
+        unload();
+        if (file.exists()) {
+            boolean result = file.delete();
+        }
+        length = 0;
+    }
+
+    /**
+     * @return the root
+     */
+    TreePage getRoot() {
+        return this.root;
+    }
+
+    TreePage lookupPage(long pageId) throws IOException {
+        TreePage result = null;
+        if (pageId >= 0) {
+            if (root != null && root.getId() == pageId) {
+                result = root;
+            } else {
+                result = getFromCache(pageId);
+            }
+            if (result == null) {
+                result = getFullPage(pageId);
+                if (result != null) {
+                    if (result.isActive()) {
+                        addToCache(result);
+                    } else {
+                        throw new IllegalStateException("Trying to access an inactive page: " + pageId + " root is " + root);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    TreePage createRoot() throws IOException {
+        TreePage result = createPage(-1);
+        root = result;
+        return result;
+    }
+
+    TreePage createPage(long parentId) throws IOException {
+        TreePage result = getNextFreePage();
+        if (result == null) {
+            // allocate one
+            result = new TreePage(keysPerPage);
+            result.setId(length);
+            result.setTree(this);
+            result.setParentId(parentId);
+            writePage(result);
+            length += pageSize;
+            indexFile.seek(length);
+            indexFile.write(TreeEntry.NOT_SET);
+        }
+        addToCache(result);
+        return result;
+    }
+
+    void releasePage(TreePage page) throws IOException {
+        removeFromCache(page);
+        page.reset();
+        page.setActive(false);
+        if (lastFree == null) {
+            firstFree = page;
+            lastFree = page;
+        } else {
+            lastFree.setNextFreePageId(page.getId());
+            writePage(lastFree);
+        }
+        writePage(page);
+    }
+
+    private TreePage getNextFreePage() throws IOException {
+        TreePage result = null;
+        if (firstFree != null) {
+            if (firstFree.equals(lastFree)) {
+                result = firstFree;
+                firstFree = null;
+                lastFree = null;
+            } else {
+                result = firstFree;
+                firstFree = getPage(firstFree.getNextFreePageId());
+                if (firstFree == null) {
+                    lastFree = null;
+                }
+            }
+            result.setActive(true);
+            result.reset();
+            result.saveHeader();
+        }
+        return result;
+    }
+
+    void writeFullPage(TreePage page) throws IOException {
+        dataOut.reset();
+        page.write(keyMarshaller, dataOut);
+        if (dataOut.size() > pageSize) {
+            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+        }
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(), 0, dataOut.size());
+    }
+
+    void writePage(TreePage page) throws IOException {
+        dataOut.reset();
+        page.writeHeader(dataOut);
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
+    }
+
+    TreePage getFullPage(long id) throws IOException {
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer, 0, pageSize);
+        dataIn.restart(readBuffer);
+        TreePage page = new TreePage(keysPerPage);
+        page.setId(id);
+        page.setTree(this);
+        page.read(keyMarshaller, dataIn);
+        return page;
+    }
+
+    TreePage getPage(long id) throws IOException {
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
+        dataIn.restart(readBuffer);
+        TreePage page = new TreePage(keysPerPage);
+        page.setId(id);
+        page.setTree(this);
+        page.readHeader(dataIn);
+        return page;
+    }
+
+    private TreePage getFromCache(long pageId) {
+        TreePage result = null;
+        if (enablePageCaching) {
+            result = pageCache.get(pageId);
+        }
+        return result;
+    }
+
+    private void addToCache(TreePage page) {
+        if (enablePageCaching) {
+            pageCache.put(page.getId(), page);
+        }
+    }
+
+    private void removeFromCache(TreePage page) {
+        if (enablePageCaching) {
+            pageCache.remove(page.getId());
+        }
+    }
+
+    protected void openIndexFile() throws IOException {
+        if (indexFile == null) {
+            file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
+            IOHelper.mkdirs(file.getParentFile());
+            indexFile = new RandomAccessFile(file, "rw");
+        }
+    }
+
+    static {
+        DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
+        DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
+    }
+
+    public int getSize() {
+        return 0;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePage.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePage.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePage.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.tree;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+
+/**
+ * Page in a BTree
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class TreePage {
+
+    static final int PAGE_HEADER_SIZE = 18;
+    private static final transient Log LOG = LogFactory.getLog(TreePage.class);
+
+    static enum Flavour {
+        LESS, MORE
+    }
+
+    private TreeIndex tree;
+    private int maximumEntries;
+    private long id;
+    private long parentId = TreeEntry.NOT_SET;
+    private boolean leaf = true;
+    private List<TreeEntry> treeEntries;
+    /*
+     * for persistence only
+     */
+    private long nextFreePageId = TreeEntry.NOT_SET;
+    private boolean active = true;
+
+    /**
+     * Constructor
+     * 
+     * @param tree
+     * @param id
+     * @param parentId
+     * @param maximumEntries
+     */
+    TreePage(TreeIndex tree, long id, long parentId, int maximumEntries) {
+        this(maximumEntries);
+        this.tree = tree;
+        this.id = id;
+        this.parentId = parentId;
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param maximumEntries
+     */
+    public TreePage(int maximumEntries) {
+        this.maximumEntries = maximumEntries;
+        this.treeEntries = new ArrayList<TreeEntry>(maximumEntries);
+    }
+
+    public String toString() {
+        return "TreePage[" + getId() + "]parent=" + getParentId();
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof TreePage) {
+            TreePage other = (TreePage)o;
+            result = other.id == id;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return (int)id;
+    }
+
+    boolean isActive() {
+        return this.active;
+    }
+
+    void setActive(boolean active) {
+        this.active = active;
+    }
+
+    long getNextFreePageId() {
+        return this.nextFreePageId;
+    }
+
+    void setNextFreePageId(long nextPageId) {
+        this.nextFreePageId = nextPageId;
+    }
+
+    long getId() {
+        return id;
+    }
+
+    void setId(long id) {
+        this.id = id;
+    }
+
+    void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+        writeHeader(dataOut);
+        dataOut.writeInt(treeEntries.size());
+        for (TreeEntry entry : treeEntries) {
+            entry.write(keyMarshaller, dataOut);
+        }
+    }
+
+    void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
+        readHeader(dataIn);
+        int size = dataIn.readInt();
+        treeEntries.clear();
+        for (int i = 0; i < size; i++) {
+            TreeEntry entry = new TreeEntry();
+            entry.read(keyMarshaller, dataIn);
+            treeEntries.add(entry);
+        }
+    }
+
+    void readHeader(DataInput dataIn) throws IOException {
+        active = dataIn.readBoolean();
+        leaf = dataIn.readBoolean();
+        setParentId(dataIn.readLong());
+        nextFreePageId = dataIn.readLong();
+    }
+
+    void writeHeader(DataOutput dataOut) throws IOException {
+        dataOut.writeBoolean(isActive());
+        dataOut.writeBoolean(isLeaf());
+        dataOut.writeLong(getParentId());
+        dataOut.writeLong(nextFreePageId);
+    }
+
+    boolean isEmpty() {
+        return treeEntries.isEmpty();
+    }
+
+    boolean isFull() {
+        return treeEntries.size() >= maximumEntries;
+    }
+
+    boolean isRoot() {
+        return getParentId() < 0;
+    }
+
+    boolean isLeaf() {
+        if (treeEntries.isEmpty()) {
+            leaf = true;
+        }
+        return leaf;
+    }
+
+    boolean isUnderflowed() {
+        return treeEntries.size() < (maximumEntries / 2);
+    }
+
+    boolean isOverflowed() {
+        return treeEntries.size() > maximumEntries;
+    }
+
+    void setLeaf(boolean newValue) {
+        this.leaf = newValue;
+    }
+
+    TreePage getParent() throws IOException {
+        return tree.lookupPage(parentId);
+    }
+
+    long getParentId() {
+        return parentId;
+    }
+
+    void setParentId(long newId) throws IOException {
+        if (newId == this.id) {
+            throw new IllegalStateException("Cannot set page as a child of itself " + this
+                                            + " trying to set parentId = " + newId);
+        }
+        this.parentId = newId;
+        tree.writePage(this);
+    }
+
+    List<TreeEntry> getEntries() {
+        return treeEntries;
+    }
+
+    void setEntries(List<TreeEntry> newEntries) {
+        this.treeEntries = newEntries;
+    }
+
+    int getMaximumEntries() {
+        return this.maximumEntries;
+    }
+
+    void setMaximumEntries(int maximumEntries) {
+        this.maximumEntries = maximumEntries;
+    }
+
+    int size() {
+        return treeEntries.size();
+    }
+
+    TreeIndex getTree() {
+        return this.tree;
+    }
+
+    void setTree(TreeIndex tree) {
+        this.tree = tree;
+    }
+
+    void reset() throws IOException {
+        treeEntries.clear();
+        setParentId(TreeEntry.NOT_SET);
+        setNextFreePageId(TreeEntry.NOT_SET);
+        setLeaf(true);
+    }
+
+    public TreeEntry find(TreeEntry key) throws IOException {
+        int low = 0;
+        int high = size() - 1;
+        long pageId = -1;
+        while (low <= high) {
+            int mid = (low + high) >> 1;
+            TreeEntry te = getTreeEntry(mid);
+            int cmp = te.compareTo(key);
+            if (cmp == 0) {
+                return te;
+            } else if (cmp < 0) {
+                low = mid + 1;
+                pageId = te.getNextPageId();
+            } else {
+                high = mid - 1;
+                pageId = te.getPrevPageId();
+            }
+        }
+        TreePage page = tree.lookupPage(pageId);
+        if (page != null) {
+            return page.find(key);
+        }
+        return null;
+    }
+
+    TreeEntry put(TreeEntry newEntry) throws IOException {
+        TreeEntry result = null;
+        if (isRoot()) {
+            if (isEmpty()) {
+                insertTreeEntry(0, newEntry);
+            } else {
+                result = doInsert(null, newEntry);
+            }
+        } else {
+            throw new IllegalStateException("insert() should not be called on non root page - " + this);
+        }
+        return result;
+    }
+
+    TreeEntry remove(TreeEntry entry) throws IOException {
+        TreeEntry result = null;
+        if (isRoot()) {
+            if (!isEmpty()) {
+                result = doRemove(entry);
+            }
+        } else {
+            throw new IllegalStateException("remove() should not be called on non root page");
+        }
+        return result;
+    }
+
+    private TreeEntry doInsert(Flavour flavour, TreeEntry newEntry) throws IOException {
+        TreeEntry result = null;
+        TreePageEntry closest = findClosestEntry(newEntry);
+        if (closest != null) {
+            TreeEntry closestEntry = closest.getTreeEntry();
+            TreePage closestPage = closest.getTreePage();
+            int cmp = closestEntry.compareTo(newEntry);
+            if (cmp == 0) {
+                // we actually just need to pass back the value
+                long oldValue = closestEntry.getIndexOffset();
+                closestEntry.setIndexOffset(newEntry.getIndexOffset());
+                newEntry.setIndexOffset(oldValue);
+                result = newEntry;
+                save();
+            } else if (closestPage != null) {
+                result = closestPage.doInsert(closest.getFlavour(), newEntry);
+            } else {
+                if (!isFull()) {
+                    insertTreeEntry(closest.getIndex(), newEntry);
+                    save();
+                } else {
+                    doOverflow(flavour, newEntry);
+                }
+            }
+        } else {
+            if (!isFull()) {
+                doInsertEntry(newEntry);
+                save();
+            } else {
+                // need to insert the new entry and propogate up the hightest
+                // value
+                doOverflow(flavour, newEntry);
+            }
+        }
+        return result;
+    }
+
+    private TreePage doOverflow(Flavour flavour, TreeEntry newEntry) throws IOException {
+        TreePage result = this;
+        TreeEntry theEntry = newEntry;
+        if (!isFull()) {
+            doInsertEntry(newEntry);
+            save();
+        } else {
+            if (!isRoot() && flavour != null) {
+                // we aren't the root, but to ensure the correct distribution we
+                // need to
+                // insert the new entry and take a node of the end of the page
+                // and pass that up the tree to find a home
+                doInsertEntry(newEntry);
+                if (flavour == Flavour.LESS) {
+                    theEntry = removeTreeEntry(0);
+                    theEntry.reset();
+                    theEntry.setNextPageId(getId());
+                } else {
+                    theEntry = removeTreeEntry(size() - 1);
+                    theEntry.reset();
+                    theEntry.setPrevPageId(getId());
+                }
+                save();
+
+                result = getParent().doOverflow(flavour, theEntry);
+                if (!theEntry.equals(newEntry)) {
+                    // the newEntry stayed here
+                    result = this;
+                }
+            } else {
+                // so we are the root and need to split
+                doInsertEntry(newEntry);
+                int midIndex = size() / 2;
+                TreeEntry midEntry = removeTreeEntry(midIndex);
+                List<TreeEntry> subList = getSubList(midIndex, size());
+                removeAllTreeEntries(subList);
+                TreePage newRoot = tree.createRoot();
+                newRoot.setLeaf(false);
+                this.setParentId(newRoot.getId());
+                save(); // we are no longer root - need to save - we maybe
+                // looked up v. soon!
+                TreePage rightPage = tree.createPage(newRoot.getId());
+                rightPage.setEntries(subList);
+                rightPage.checkLeaf();
+                resetParentId(rightPage.getId(), rightPage.getEntries());
+                midEntry.setNextPageId(rightPage.getId());
+                midEntry.setPrevPageId(this.getId());
+                newRoot.insertTreeEntry(0, midEntry);
+                resetParentId(newRoot.getId(), newRoot.getEntries());
+                save();
+                rightPage.save();
+                newRoot.save();
+            }
+        }
+        return result;
+    }
+
+    private TreeEntry doRemove(TreeEntry entry) throws IOException {
+        TreeEntry result = null;
+        TreePageEntry closest = findClosestEntry(entry);
+        if (closest != null) {
+            TreeEntry closestEntry = closest.getTreeEntry();
+            if (closestEntry != null) {
+                TreePage closestPage = closest.getTreePage();
+                int cmp = closestEntry.compareTo(entry);
+                if (cmp == 0) {
+                    result = closest.getTreeEntry();
+                    int index = closest.getIndex();
+                    removeTreeEntry(index);
+                    save();
+                    // ensure we don't loose children
+                    doUnderflow(result, index);
+                } else if (closestPage != null) {
+                    closestPage.doRemove(entry);
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return true if the page is removed
+     * @throws IOException
+     */
+    private boolean doUnderflow() throws IOException {
+        boolean result = false;
+        boolean working = true;
+        while (working && isUnderflowed() && !isEmpty() && !isLeaf()) {
+            int lastIndex = size() - 1;
+            TreeEntry entry = getTreeEntry(lastIndex);
+            working = doUnderflow(entry, lastIndex);
+        }
+        if (isUnderflowed() && isLeaf()) {
+            result = doUnderflowLeaf();
+        }
+        return result;
+    }
+
+    private boolean doUnderflow(TreeEntry entry, int index) throws IOException {
+        boolean result = false;
+        // pull an entry up from a leaf to fill the empty space
+        if (entry.getNextPageId() != TreeEntry.NOT_SET) {
+            TreePage page = tree.lookupPage(entry.getNextPageId());
+            if (page != null && !page.isEmpty()) {
+                TreeEntry replacement = page.removeTreeEntry(0);
+                TreeEntry copy = replacement.copy();
+                checkParentIdForRemovedPageEntry(copy, page.getId(), getId());
+                if (!page.isEmpty()) {
+                    copy.setNextPageId(page.getId());
+                    page.setParentId(this.id);
+                } else {
+                    page.setLeaf(true);
+                }
+                int replacementIndex = doInsertEntry(copy);
+                if (page.doUnderflow()) {
+                    // page removed so update our replacement
+                    resetPageReference(replacementIndex, copy.getNextPageId());
+                    copy.setNextPageId(TreeEntry.NOT_SET);
+                } else {
+                    page.save();
+                }
+                save();
+                result = true;
+            }
+        }
+        // ensure we don't loose previous bit of the tree
+        if (entry.getPrevPageId() != TreeEntry.NOT_SET) {
+            TreeEntry prevEntry = (index > 0) ? getTreeEntry(index - 1) : null;
+            if (prevEntry == null || prevEntry.getNextPageId() != entry.getPrevPageId()) {
+                TreePage page = tree.lookupPage(entry.getPrevPageId());
+                if (page != null && !page.isEmpty()) {
+                    TreeEntry replacement = page.removeTreeEntry(page.getEntries().size() - 1);
+                    TreeEntry copy = replacement.copy();
+                    // check children pages of the replacement point to the
+                    // correct place
+                    checkParentIdForRemovedPageEntry(copy, page.getId(), getId());
+                    if (!page.isEmpty()) {
+                        copy.setPrevPageId(page.getId());
+                    } else {
+                        page.setLeaf(true);
+                    }
+                    insertTreeEntry(index, copy);
+                    // if we overflow - the page the replacement ends up on
+                    TreePage landed = null;
+                    TreeEntry removed = null;
+                    if (isOverflowed()) {
+                        TreePage parent = getParent();
+                        if (parent != null) {
+                            removed = getTreeEntry(0);
+                            Flavour flavour = getFlavour(parent, removed);
+                            if (flavour == Flavour.LESS) {
+                                removed = removeTreeEntry(0);
+                                landed = parent.doOverflow(flavour, removed);
+                            } else {
+                                removed = removeTreeEntry(size() - 1);
+                                landed = parent.doOverflow(Flavour.MORE, removed);
+                            }
+                        }
+                    }
+                    if (page.doUnderflow()) {
+                        if (landed == null || landed.equals(this)) {
+                            landed = this;
+                        }
+
+                        resetPageReference(copy.getNextPageId());
+                        landed.resetPageReference(copy.getNextPageId());
+                        copy.setPrevPageId(TreeEntry.NOT_SET);
+                        landed.save();
+                    } else {
+                        page.save();
+                    }
+                    save();
+                    result = true;
+                }
+                // now we need to check we haven't overflowed this page
+            }
+        }
+        if (!result) {
+            save();
+        }
+        // now see if we need to save this page
+        result |= doUnderflowLeaf();
+        save();
+        return result;
+    }
+
+    private boolean doUnderflowLeaf() throws IOException {
+        boolean result = false;
+        // if we have unerflowed - and we are a leaf - push entries further up
+        // the tree
+        // and delete ourselves
+        if (isUnderflowed() && isLeaf()) {
+            List<TreeEntry> list = new ArrayList<TreeEntry>(treeEntries);
+            treeEntries.clear();
+            for (TreeEntry entry : list) {
+                // need to check for each iteration - we might get promoted to
+                // root
+                TreePage parent = getParent();
+                if (parent != null) {
+                    Flavour flavour = getFlavour(parent, entry);
+                    TreePage landedOn = parent.doOverflow(flavour, entry);
+                    checkParentIdForRemovedPageEntry(entry, getId(), landedOn.getId());
+                }
+            }
+            TreePage parent = getParent();
+            if (parent != null) {
+                parent.checkLeaf();
+                parent.removePageId(getId());
+                parent.doUnderflow();
+                parent.save();
+                tree.releasePage(this);
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    private Flavour getFlavour(TreePage page, TreeEntry entry) {
+        Flavour result = null;
+        if (page != null && !page.getEntries().isEmpty()) {
+            TreeEntry last = page.getEntries().get(page.getEntries().size() - 1);
+            if (last.compareTo(entry) > 0) {
+                result = Flavour.MORE;
+            } else {
+                result = Flavour.LESS;
+            }
+        }
+        return result;
+    }
+
+    private void checkLeaf() {
+        boolean result = false;
+        for (TreeEntry entry : treeEntries) {
+            if (entry.hasChildPagesReferences()) {
+                result = true;
+                break;
+            }
+        }
+        setLeaf(!result);
+    }
+
+    private void checkParentIdForRemovedPageEntry(TreeEntry entry, long oldPageId, long newPageId)
+        throws IOException {
+        TreePage page = tree.lookupPage(entry.getPrevPageId());
+        if (page != null && page.getParentId() == oldPageId) {
+            page.setParentId(newPageId);
+            page.save();
+        }
+        page = tree.lookupPage(entry.getNextPageId());
+        if (page != null && page.getParentId() == oldPageId) {
+            page.setParentId(newPageId);
+            page.save();
+        }
+    }
+
+    private void removePageId(long pageId) {
+        for (TreeEntry entry : treeEntries) {
+            if (entry.getNextPageId() == pageId) {
+                entry.setNextPageId(TreeEntry.NOT_SET);
+            }
+            if (entry.getPrevPageId() == pageId) {
+                entry.setPrevPageId(TreeEntry.NOT_SET);
+            }
+        }
+    }
+
+    private TreePageEntry findClosestEntry(TreeEntry key) throws IOException {
+        TreePageEntry result = null;
+        TreeEntry treeEntry = null;
+        Flavour flavour = null;
+        long pageId = -1;
+        int low = 0;
+        int high = size() - 1;
+        int mid = low;
+        while (low <= high) {
+            mid = (low + high) >> 1;
+            treeEntry = getTreeEntry(mid);
+            int cmp = treeEntry.compareTo(key);
+            if (cmp < 0) {
+                low = mid + 1;
+                pageId = treeEntry.getNextPageId();
+                flavour = Flavour.LESS;
+            } else if (cmp > 0) {
+                high = mid - 1;
+                pageId = treeEntry.getPrevPageId();
+                flavour = Flavour.MORE;
+            } else {
+                // got exact match
+                low = mid;
+                break;
+            }
+        }
+        if (treeEntry != null) {
+            TreePage treePage = tree.lookupPage(pageId);
+            result = new TreePageEntry(treeEntry, treePage, flavour, low);
+        }
+        return result;
+    }
+
+    private int doInsertEntry(TreeEntry newEntry) throws IOException {
+        int low = 0;
+        int high = size() - 1;
+        while (low <= high) {
+            int mid = (low + high) >> 1;
+            TreeEntry midVal = getTreeEntry(mid);
+            int cmp = midVal.compareTo(newEntry);
+            if (cmp < 0) {
+                low = mid + 1;
+            } else if (cmp > 0) {
+                high = mid - 1;
+            }
+        }
+        insertTreeEntry(low, newEntry);
+        return low;
+    }
+
+    private void insertTreeEntry(int index, TreeEntry entry) throws IOException {
+        int p = index - 1;
+        int n = index;
+        TreeEntry prevEntry = (p >= 0 && p < treeEntries.size()) ? treeEntries.get(p) : null;
+        TreeEntry nextEntry = (n >= 0 && n < treeEntries.size()) ? treeEntries.get(n) : null;
+        if (prevEntry != null) {
+            if (prevEntry.getNextPageId() == entry.getNextPageId()) {
+                prevEntry.setNextPageId(TreeEntry.NOT_SET);
+            }
+            if (entry.getPrevPageId() == TreeEntry.NOT_SET) {
+                entry.setPrevPageId(prevEntry.getNextPageId());
+            }
+        }
+        if (nextEntry != null) {
+            if (nextEntry.getPrevPageId() == entry.getPrevPageId()) {
+                nextEntry.setPrevPageId(TreeEntry.NOT_SET);
+            }
+            if (entry.getNextPageId() == TreeEntry.NOT_SET) {
+                entry.setNextPageId(nextEntry.getPrevPageId());
+            }
+        }
+        addTreeEntry(index, entry);
+    }
+
+    private void resetPageReference(int index, long pageId) {
+        int p = index - 1;
+        int n = index;
+        TreeEntry prevEntry = (p >= 0 && p < treeEntries.size()) ? treeEntries.get(p) : null;
+        TreeEntry nextEntry = (n >= 0 && n < treeEntries.size()) ? treeEntries.get(n) : null;
+        if (prevEntry != null) {
+            if (prevEntry.getNextPageId() == pageId) {
+                prevEntry.setNextPageId(TreeEntry.NOT_SET);
+            }
+        }
+        if (nextEntry != null) {
+            if (nextEntry.getPrevPageId() == pageId) {
+                nextEntry.setPrevPageId(TreeEntry.NOT_SET);
+            }
+        }
+    }
+
+    private boolean resetPageReference(long pageId) {
+        boolean updated = false;
+        for (TreeEntry entry : treeEntries) {
+            if (entry.getPrevPageId() == pageId) {
+                entry.setPrevPageId(TreeEntry.NOT_SET);
+                updated = true;
+            }
+            if (entry.getNextPageId() == pageId) {
+                entry.setNextPageId(TreeEntry.NOT_SET);
+                updated = true;
+            }
+        }
+        return updated;
+    }
+
+    private void resetParentId(long newParentId, List<TreeEntry> entries) throws IOException {
+        Set<Long> set = new HashSet<Long>();
+        for (TreeEntry entry : entries) {
+            if (entry != null) {
+                set.add(entry.getPrevPageId());
+                set.add(entry.getNextPageId());
+            }
+        }
+        for (Long pageId : set) {
+            TreePage page = tree.lookupPage(pageId);
+            if (page != null) {
+                page.setParentId(newParentId);
+            }
+        }
+    }
+
+    private void addTreeEntry(int index, TreeEntry entry) throws IOException {
+        treeEntries.add(index, entry);
+    }
+
+    private TreeEntry removeTreeEntry(int index) throws IOException {
+        TreeEntry result = treeEntries.remove(index);
+        return result;
+    }
+
+    private void removeAllTreeEntries(List<TreeEntry> c) {
+        treeEntries.removeAll(c);
+    }
+
+    private List<TreeEntry> getSubList(int from, int to) {
+        return new ArrayList<TreeEntry>(treeEntries.subList(from, to));
+    }
+
+    private TreeEntry getTreeEntry(int index) {
+        TreeEntry result = treeEntries.get(index);
+        return result;
+    }
+
+    void saveHeader() throws IOException {
+        tree.writePage(this);
+    }
+
+    void save() throws IOException {
+        tree.writeFullPage(this);
+    }
+
+    protected void dump() throws IOException {
+        LOG.info(this);
+        Set<Long> set = new HashSet<Long>();
+        for (TreeEntry entry : treeEntries) {
+            if (entry != null) {
+                LOG.info(entry);
+                set.add(entry.getPrevPageId());
+                set.add(entry.getNextPageId());
+            }
+        }
+        for (Long pageId : set) {
+            TreePage page = tree.lookupPage(pageId);
+            if (page != null) {
+                page.dump();
+            }
+        }
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePageEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePageEntry.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePageEntry.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/TreePageEntry.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index.tree;
+
+/**
+ * A conglomarate used for return results from a tree lookup
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class TreePageEntry {
+
+    private TreeEntry treeEntry;
+    private TreePage treePage;
+    private TreePage.Flavour flavour;
+    private int index = -1;
+
+    TreePageEntry(TreeEntry treeEntry, TreePage treePage, TreePage.Flavour flavour, int index) {
+        this.treeEntry = treeEntry;
+        this.treePage = treePage;
+        this.flavour = flavour;
+        this.index = index;
+    }
+
+    /**
+     * @return the flavour
+     */
+    TreePage.Flavour getFlavour() {
+        return this.flavour;
+    }
+
+    /**
+     * @param flavour the flavour to set
+     */
+    void setFlavour(TreePage.Flavour flavour) {
+        this.flavour = flavour;
+    }
+
+    /**
+     * @return the treePage
+     */
+    TreePage getTreePage() {
+        return this.treePage;
+    }
+
+    /**
+     * @param treePage the treePage to set
+     */
+    void setTreePage(TreePage treePage) {
+        this.treePage = treePage;
+    }
+
+    /**
+     * @return the index
+     */
+    public int getIndex() {
+        return this.index;
+    }
+
+    /**
+     * @param index the index to set
+     */
+    public void setIndex(int index) {
+        this.index = index;
+    }
+
+    /**
+     * @return the treeEntry
+     */
+    public TreeEntry getTreeEntry() {
+        return this.treeEntry;
+    }
+
+    /**
+     * @param treeEntry the treeEntry to set
+     */
+    public void setTreeEntry(TreeEntry treeEntry) {
+        this.treeEntry = treeEntry;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+BTree implementation of an index for a Map
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ Kaha implementation classes
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	fast message persistence implementation
+</p>
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/package.html
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message