activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r503176 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/kaha/impl/container/ main/java/org/apache/activemq/kaha/impl/index/ main/java/o...
Date Sat, 03 Feb 2007 06:54:00 GMT
Author: rajdavies
Date: Fri Feb  2 22:53:59 2007
New Revision: 503176

URL: http://svn.apache.org/viewvc?view=rev&rev=503176
Log:
Addition of Indexes based on BTree and Hashing

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreePage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreePageEntry.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?view=diff&rev=503176&r1=503175&r2=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Fri Feb  2 22:53:59 2007
@@ -171,14 +171,5 @@
      */
     public V getValue(StoreEntry Valuelocation);
     
-    /**
-     * Set the internal index map
-     * @param map
-     */
-    public void setIndexMap(Map map);
-    
-    /**
-     * @return the index map
-     */
-    public Map getIndexMap();
+   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=503176&r1=503175&r2=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Fri Feb  2 22:53:59 2007
@@ -202,7 +202,7 @@
             if(root==null){
                 root=mapsContainer.addRoot(im,containerId);
             }
-            result=new MapContainerImpl(containerId,root,im,dm,indexType);
+            result=new MapContainerImpl(directory,containerId,root,im,dm,indexType);
             maps.put(containerId,result);
         }
         return result;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java?view=diff&rev=503176&r1=503175&r2=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java Fri Feb  2 22:53:59 2007
@@ -22,6 +22,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import org.apache.activemq.kaha.impl.index.IndexItem;
+import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
 /**
 * A Set of keys for the container
@@ -37,19 +39,29 @@
     
     
     public boolean contains(Object o){
-        return container.getInternalKeySet().contains(o);
+        return container.containsKey(o);
     }
 
     public Iterator iterator(){
-        return new ContainerKeySetIterator(container,container.getInternalKeySet().iterator());
+        return new ContainerKeySetIterator(container);
     }
 
     public Object[] toArray(){
-        return container.getInternalKeySet().toArray();
+        List list = new ArrayList();
+        IndexItem item = container.getInternalList().getRoot();
+        while ((item = container.getInternalList().getNextEntry(item)) != null) {
+            list.add(container.getKey(item));
+        }
+        return list.toArray();
     }
 
     public Object[] toArray(Object[] a){
-        return container.getInternalKeySet().toArray(a);
+        List list = new ArrayList();
+        IndexItem item = container.getInternalList().getRoot();
+        while ((item = container.getInternalList().getNextEntry(item)) != null) {
+            list.add(container.getKey(item));
+        }
+        return list.toArray(a);
     }
 
     public boolean add(Object o){
@@ -61,7 +73,13 @@
     }
 
     public boolean containsAll(Collection c){
-       return container.getInternalKeySet().containsAll(c);
+        boolean result = true;
+        for (Object key:c) {
+            if (!(result&=container.containsKey(key))) {
+                break;
+            }
+        }
+       return result;
     }
 
     public boolean addAll(Collection c){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java?view=diff&rev=503176&r1=503175&r2=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java Fri Feb  2 22:53:59 2007
@@ -18,6 +18,8 @@
 package org.apache.activemq.kaha.impl.container;
 
 import java.util.Iterator;
+import org.apache.activemq.kaha.impl.index.IndexItem;
+import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
 
 /**
@@ -27,25 +29,31 @@
 */
 public class ContainerKeySetIterator implements Iterator{
     private MapContainerImpl container;
-    private Iterator  iter;
-    private Object currentKey;
-    ContainerKeySetIterator(MapContainerImpl container,Iterator iter){
+    private IndexLinkedList list;
+    protected IndexItem nextItem;
+    protected IndexItem currentItem;
+   
+    ContainerKeySetIterator(MapContainerImpl container){
         this.container = container;
-        this.iter = iter;
+        this.list=container.getInternalList();
+        this.currentItem=list.getRoot();
+        this.nextItem=list.getNextEntry(currentItem);
     }
     
     public boolean hasNext(){
-        return iter.hasNext();
+        return nextItem!=null;
     }
 
     public Object next(){
-        currentKey =  iter.next();
-        return currentKey;
+        currentItem=nextItem;
+        Object result=container.getKey(nextItem);
+        nextItem=list.getNextEntry(nextItem);
+        return result;
     }
 
     public void remove(){
-       if (currentKey != null){
-           container.remove(currentKey);
-       }
+        if(currentItem!=null){
+            container.remove(currentItem);
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=503176&r1=503175&r2=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Fri Feb  2 22:53:59 2007
@@ -1,27 +1,22 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.activemq.kaha.impl.container;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -34,9 +29,12 @@
 import org.apache.activemq.kaha.StoreLocation;
 import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.activemq.kaha.impl.data.Item;
+import org.apache.activemq.kaha.impl.index.Index;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 import org.apache.activemq.kaha.impl.index.IndexManager;
+import org.apache.activemq.kaha.impl.index.VMIndex;
+import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -48,24 +46,33 @@
 public final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
 
     private static final Log log=LogFactory.getLog(MapContainerImpl.class);
-    protected Map indexMap;
+    protected Index index;
     protected Marshaller keyMarshaller=Store.ObjectMarshaller;
     protected Marshaller valueMarshaller=Store.ObjectMarshaller;
+    protected File directory;
 
-    public MapContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType){
+    public MapContainerImpl(File directory,ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
+            String indexType){
         super(id,root,indexManager,dataManager,indexType);
+        this.directory = directory;
     }
-    
-    public synchronized void init(){
+
+    public synchronized void init() {
         super.init();
-        if(indexMap == null){
+        if(index==null){
             if(indexType.equals(IndexTypes.DISK_INDEX)){
-                this.indexMap = new HashMap();
+                String name = containerId.getDataContainerName() + "_" + containerId.getKey();
+                try{
+                    this.index=new HashIndex(directory, name , indexManager);
+                }catch(IOException e){
+                    log.error("Failed to create HashIndex",e);
+                    throw new RuntimeException(e);
+                }
             }else{
-                this.indexMap = new HashMap();
+                this.index=new VMIndex();
             }
         }
-            
+        index.setKeyMarshaller(keyMarshaller);
     }
 
     /*
@@ -80,12 +87,15 @@
                 loaded=true;
                 try{
                     init();
+                    index.load();
                     long nextItem=root.getNextItem();
                     while(nextItem!=Item.POSITION_NOT_SET){
                         IndexItem item=indexManager.getIndex(nextItem);
                         StoreLocation data=item.getKeyDataItem();
                         Object key=dataManager.readItem(keyMarshaller,data);
-                        indexMap.put(key,item);
+                        if(index.isTransient()){
+                            index.store(key,item);
+                        }
                         indexList.add(item);
                         nextItem=item.getNextItem();
                     }
@@ -106,7 +116,11 @@
         checkClosed();
         if(loaded){
             loaded=false;
-            indexMap.clear();
+            try{
+                index.unload();
+            }catch(IOException e){
+                log.warn("Failed to unload the index",e);
+            }
             indexList.clear();
         }
     }
@@ -114,6 +128,9 @@
     public synchronized void setKeyMarshaller(Marshaller keyMarshaller){
         checkClosed();
         this.keyMarshaller=keyMarshaller;
+        if(index!=null){
+            index.setKeyMarshaller(keyMarshaller);
+        }
     }
 
     public synchronized void setValueMarshaller(Marshaller valueMarshaller){
@@ -128,7 +145,7 @@
      */
     public synchronized int size(){
         load();
-        return indexMap.size();
+        return indexList.size();
     }
 
     /*
@@ -138,7 +155,7 @@
      */
     public synchronized boolean isEmpty(){
         load();
-        return indexMap.isEmpty();
+        return indexList.isEmpty();
     }
 
     /*
@@ -148,7 +165,12 @@
      */
     public synchronized boolean containsKey(Object key){
         load();
-        return indexMap.containsKey(key);
+        try{
+            return index.containsKey(key);
+        }catch(IOException e){
+            log.error("Failed trying to find key: "+key,e);
+            throw new RuntimeException(e);
+        }
     }
 
     /*
@@ -160,7 +182,12 @@
         load();
         Object result=null;
         StoreEntry item=null;
-        item=(StoreEntry)indexMap.get(key);
+        try{
+            item=(StoreEntry)index.get(key);
+        }catch(IOException e){
+            log.error("Failed trying to get key: "+key,e);
+            throw new RuntimeException(e);
+        }
         if(item!=null){
             result=getValue(item);
         }
@@ -237,14 +264,18 @@
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
-     *      java.lang.Object)
+     * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
      */
     public synchronized Object put(Object key,Object value){
         load();
         Object result=remove(key);;
         IndexItem item=write(key,value);
-        indexMap.put(key,item);
+        try{
+            index.store(key,item);
+        }catch(IOException e){
+            log.error("Failed trying to insert key: "+key,e);
+            throw new RuntimeException(e);
+        }
         indexList.add(item);
         return result;
     }
@@ -256,19 +287,24 @@
      */
     public synchronized Object remove(Object key){
         load();
-        Object result=null;
-        IndexItem item=(IndexItem)indexMap.get(key);
-        if(item!=null){
-            //refresh the index
-            item = (IndexItem)indexList.refreshEntry(item);
-            indexMap.remove(key);
-            result=getValue(item);
-            IndexItem prev=indexList.getPrevEntry(item);
-            IndexItem next=indexList.getNextEntry(item);
-            indexList.remove(item);
-            delete(item,prev,next);
+        try{
+            Object result=null;
+            IndexItem item=(IndexItem)index.get(key);
+            if(item!=null){
+                // refresh the index
+                item=(IndexItem)indexList.refreshEntry(item);
+                index.remove(key);
+                result=getValue(item);
+                IndexItem prev=indexList.getPrevEntry(item);
+                IndexItem next=indexList.getNextEntry(item);
+                indexList.remove(item);
+                delete(item,prev,next);
+            }
+            return result;
+        }catch(IOException e){
+            log.error("Failed trying to remove key: "+key,e);
+            throw new RuntimeException(e);
         }
-        return result;
     }
 
     public synchronized boolean removeValue(Object o){
@@ -308,51 +344,69 @@
     public synchronized void clear(){
         checkClosed();
         loaded=true;
-        if(indexMap!=null){
-            indexMap.clear();
+        if(index!=null){
+            try{
+                index.clear();
+            }catch(IOException e){
+                log.error("Failed trying clear index",e);
+                throw new RuntimeException(e);
+            }
         }
         super.clear();
         doClear();
     }
-    
+
     /**
      * Add an entry to the Store Map
+     * 
      * @param key
      * @param value
      * @return the StoreEntry associated with the entry
      */
-    public synchronized StoreEntry place(Object key, Object value) {
+    public synchronized StoreEntry place(Object key,Object value){
         load();
-        if(indexMap.containsKey(key)){
-            remove(key);
+        try{
+            if(index.containsKey(key)){
+                remove(key);
+            }
+            IndexItem item=write(key,value);
+            index.store(key,item);
+            indexList.add(item);
+            return item;
+        }catch(IOException e){
+            log.error("Failed trying to palce key: "+key,e);
+            throw new RuntimeException(e);
         }
-        IndexItem item=write(key,value);
-        indexMap.put(key,item);
-        indexList.add(item);
-        return item;
     }
-    
+
     /**
      * Remove an Entry from ther Map
+     * 
      * @param entry
+     * @throws IOException
      */
-    public synchronized void remove(StoreEntry entry) {
+    public synchronized void remove(StoreEntry entry){
         load();
         IndexItem item=(IndexItem)entry;
         if(item!=null){
-            
-            Object key = getKey(item);
-            indexMap.remove(key);
+            Object key=getKey(item);
+            try{
+                index.remove(key);
+            }catch(IOException e){
+                log.error("Failed trying to remove entry: "+entry,e);
+                throw new RuntimeException(e);
+            }
             IndexItem prev=indexList.getPrevEntry(item);
             IndexItem next=indexList.getNextEntry(item);
             indexList.remove(item);
             delete(item,prev,next);
         }
     }
-    
+
     /**
      * Get the value from it's location
-     * @param item 
+     * 
+     * @param item
      * @return the value associated with the store entry
      */
     public synchronized Object getValue(StoreEntry item){
@@ -361,7 +415,7 @@
         if(item!=null){
             try{
                 // ensure this value is up to date
-                //item=indexList.getEntry(item);
+                // item=indexList.getEntry(item);
                 StoreLocation data=item.getValueDataItem();
                 result=dataManager.readItem(valueMarshaller,data);
             }catch(IOException e){
@@ -374,7 +428,8 @@
 
     /**
      * Get the Key object from it's location
-     * @param item 
+     * 
+     * @param item
      * @return the Key Object associated with the StoreEntry
      */
     public synchronized Object getKey(StoreEntry item){
@@ -391,11 +446,6 @@
         }
         return result;
     }
-    
-
-    protected synchronized Set getInternalKeySet(){
-        return new HashSet(indexMap.keySet());
-    }
 
     protected IndexLinkedList getItemList(){
         return indexList;
@@ -430,22 +480,5 @@
             throw new RuntimeStoreException(e);
         }
         return index;
-    }
-
-    /**
-     * @return
-     * @see org.apache.activemq.kaha.MapContainer#getIndexMap()
-     */
-    public Map getIndexMap(){
-        return indexMap;
-    }
-
-    /**
-     * @param map
-     * @see org.apache.activemq.kaha.MapContainer#setIndexMap(java.util.Map)
-     */
-    public void setIndexMap(Map map){
-        indexMap = map;
-        
     }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,90 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index;
+
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreEntry;
+
+/**
+ * Simplier than a Map
+ * 
+ * @version $Revision: 1.2 $
+ */
+public interface Index{
+
+    /**
+     * clear the index 
+     * @throws IOException 
+     * 
+     */
+    public void clear() throws IOException;
+
+    /**
+     * @param key
+     * @return true if it contains the key
+     * @throws IOException 
+     */
+    public boolean containsKey(Object key) throws IOException;
+
+    /**
+     * remove the index key
+     * 
+     * @param key
+     * @throws IOException 
+     */
+    public void remove(Object key) throws IOException;
+
+    /**
+     * store the key, item
+     * 
+     * @param key
+     * @param entry
+     * @throws IOException 
+     */
+    public void store(Object key,StoreEntry entry) throws IOException;
+    
+    
+    /**
+     * @param key
+     * @return the entry
+     * @throws IOException 
+     */
+    public StoreEntry get(Object key) throws IOException;
+
+    /**
+     * @return true if the index is transient
+     */
+    public boolean isTransient();
+    
+    /**
+     * load indexes
+     */
+    public void load();
+    
+    /**
+     * unload indexes
+     * @throws IOException 
+     */
+    public void unload() throws IOException;
+    
+    
+    
+    /**
+     * Set the marshaller for key objects
+     * @param marshaller
+     */
+    public void setKeyMarshaller(Marshaller marshaller);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,97 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreEntry;
+
+/**
+ * Index implementation using a HashMap
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class VMIndex implements Index{
+
+    private Map<Object,StoreEntry> map=new HashMap<Object,StoreEntry>();
+
+    /**
+     * 
+     * @see org.apache.activemq.kaha.impl.index.Index#clear()
+     */
+    public void clear(){
+        map.clear();
+    }
+
+    /**
+     * @param key
+     * @return true if the index contains the key
+     * @see org.apache.activemq.kaha.impl.index.Index#containsKey(java.lang.Object)
+     */
+    public boolean containsKey(Object key){
+        return map.containsKey(key);
+    }
+
+    /**
+     * @param key
+     * @see org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object)
+     */
+    public void remove(Object key){
+        map.remove(key);
+    }
+
+    /**
+     * @param key
+     * @param entry
+     * @see org.apache.activemq.kaha.impl.index.Index#store(java.lang.Object,
+     *      org.apache.activemq.kaha.impl.index.IndexItem)
+     */
+    public void store(Object key,StoreEntry entry){
+        map.put(key,entry);
+    }
+
+    /**
+     * @param key
+     * @return the entry
+     */
+    public StoreEntry get(Object key){
+        return map.get(key);
+    }
+
+    /**
+     * @return true if the index is transient
+     */
+    public boolean isTransient(){
+        return true;
+    }
+
+    /**
+     * load indexes
+     */
+    public void load(){
+    }
+
+    /**
+     * unload indexes
+     */
+    public void unload(){
+        map.clear();
+    }
+    
+   
+    public void setKeyMarshaller(Marshaller marshaller){
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,297 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.hash;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Bin in a HashIndex
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashBin{
+
+    private HashIndex hashIndex;
+    private int id;
+    private int maximumEntries;
+    private int size=0;
+    private List<HashPageInfo> hashPages=new ArrayList<HashPageInfo>();
+
+    /**
+     * Constructor
+     * 
+     * @param hashIndex
+     * @param id
+     * @param maximumEntries
+     */
+    HashBin(HashIndex hashIndex,int id,int maximumEntries){
+        this.hashIndex=hashIndex;
+        this.id=id;
+        this.maximumEntries=maximumEntries;
+    }
+
+    public String toString(){
+        return "HashBin["+getId()+"]";
+    }
+
+    public boolean equals(Object o){
+        boolean result=false;
+        if(o instanceof HashBin){
+            HashBin other=(HashBin)o;
+            result=other.id==id;
+        }
+        return result;
+    }
+
+    public int hashCode(){
+        return (int)id;
+    }
+
+    int getId(){
+        return id;
+    }
+
+    void setId(int id){
+        this.id=id;
+    }
+
+    boolean isEmpty(){
+        return true;
+    }
+
+    int getMaximumEntries(){
+        return this.maximumEntries;
+    }
+
+    void setMaximumEntries(int maximumEntries){
+        this.maximumEntries=maximumEntries;
+    }
+
+    int size(){
+        return size;
+    }
+
+    HashPageInfo addHashPageInfo(long id,int size){
+        HashPageInfo info=new HashPageInfo(hashIndex);
+        info.setId(id);
+        info.setSize(size);
+        hashPages.add(info);
+        this.size+=size;
+        return info;
+    }
+
+    public HashEntry find(HashEntry key) throws IOException{
+        HashEntry result=null;
+        try{
+            int low=0;
+            int high=size()-1;
+            while(low<=high){
+                int mid=(low+high)>>1;
+                HashEntry te=getHashEntry(mid);
+                int cmp=te.compareTo(key);
+                if(cmp==0){
+                    result=te;
+                    break;
+                }else if(cmp<0){
+                    low=mid+1;
+                }else{
+                    high=mid-1;
+                }
+            }
+        }finally{
+            end();
+        }
+        return result;
+    }
+
+    void put(HashEntry newEntry) throws IOException{
+        try{
+            boolean replace=false;
+            int low=0;
+            int high=size()-1;
+            while(low<=high){
+                int mid=(low+high)>>1;
+                HashEntry midVal=getHashEntry(mid);
+                int cmp=midVal.compareTo(newEntry);
+                if(cmp<0){
+                    low=mid+1;
+                }else if(cmp>0){
+                    high=mid-1;
+                }else{
+                    replace=true;
+                    midVal.setIndexOffset(newEntry.getIndexOffset());
+                    break;
+                }
+            }
+            if(!replace){
+                addHashEntry(low,newEntry);
+                size++;
+            }
+        }finally{
+            end();
+        }
+    }
+
+    void remove(HashEntry entry) throws IOException{
+        try{
+            int low=0;
+            int high=size()-1;
+            while(low<=high){
+                int mid=(low+high)>>1;
+                HashEntry te=getHashEntry(mid);
+                int cmp=te.compareTo(entry);
+                if(cmp==0){
+                    removeHashEntry(mid);
+                    size--;
+                    break;
+                }else if(cmp<0){
+                    low=mid+1;
+                }else{
+                    high=mid-1;
+                }
+            }
+        }finally{
+            end();
+        }
+    }
+
+    private void addHashEntry(int index,HashEntry entry) throws IOException{
+        HashPageInfo page=getInsertPage(index);
+        int offset=index%maximumEntries;
+        page.addHashEntry(offset,entry);
+        doOverFlow(index);
+        page.save();
+    }
+
+    private HashEntry removeHashEntry(int index) throws IOException{
+        HashPageInfo page=getRetrievePage(index);
+        int offset=getRetrieveOffset(index);
+        HashEntry result=page.removeHashEntry(offset);
+        doUnderFlow(index);
+        page.save();
+        return result;
+    }
+
+    private HashEntry getHashEntry(int index) throws IOException{
+        HashPageInfo page=getRetrievePage(index);
+        page.begin();
+        int offset=getRetrieveOffset(index);
+        HashEntry result=page.getHashEntry(offset);
+        return result;
+    }
+
+    private int maximumBinSize(){
+        return maximumEntries*hashPages.size();
+    }
+
+    private HashPageInfo getInsertPage(int index) throws IOException{
+        HashPageInfo result=null;
+        if(index>=maximumBinSize()){
+            HashPage page=hashIndex.createPage(id);
+            result=addHashPageInfo(page.getId(),0);
+            result.setPage(page);
+        }else{
+            int offset=index/maximumEntries;
+            result=hashPages.get(offset);
+        }
+        result.begin();
+        return result;
+    }
+
+    private HashPageInfo getRetrievePage(int index) throws IOException{
+        HashPageInfo result=null;
+        int count=0;
+        int pageNo=0;
+        for(HashPageInfo page:hashPages){
+            count+=page.size();
+            if(index<count){
+                break;
+            }
+            pageNo++;
+        }
+        result=hashPages.get(pageNo);
+        result.begin();
+        return result;
+    }
+
+    private int getRetrieveOffset(int index) throws IOException{
+        int result=0;
+        int count=0;
+        for(HashPageInfo page:hashPages){
+            if((index+1)<=(count+page.size())){
+                //count=count==0?count:count+1;
+                result=index-count;
+                break;
+            }
+            count+=page.size();
+        }
+        return result;
+    }
+
+    private int getInsertPageNo(int index){
+        int result=index/maximumEntries;
+        return result;
+    }
+
+    private int getOffset(int index){
+        int result=index%maximumEntries;
+        return result;
+    }
+
+    private void doOverFlow(int index) throws IOException{
+        int pageNo=index/maximumEntries;
+        HashPageInfo info=hashPages.get(pageNo);
+        if(info.size()>maximumEntries){
+            // overflowed
+            HashEntry entry=info.removeHashEntry(info.size()-1);
+            doOverFlow(pageNo+1,entry);
+        }
+    }
+
+    private void doOverFlow(int pageNo,HashEntry entry) throws IOException{
+        HashPageInfo info=null;
+        if(pageNo>=hashPages.size()){
+            HashPage page=hashIndex.createPage(id);
+            info=addHashPageInfo(page.getId(),0);
+            info.setPage(page);
+        }else{
+            info=hashPages.get(pageNo);
+        }
+        info.begin();
+        info.addHashEntry(0,entry);
+        if(info.size()>maximumEntries){
+            // overflowed
+            HashEntry overflowed=info.removeHashEntry(info.size()-1);
+            doOverFlow(pageNo+1,overflowed);
+        }
+        info.save();
+    }
+
+    private void doUnderFlow(int index){
+        int pageNo=index/maximumEntries;
+        int nextPageNo=pageNo+1;
+        if(nextPageNo<hashPages.size()){
+        }
+        HashPageInfo info=hashPages.get(pageNo);
+    }
+
+    private void end(){
+        for(HashPageInfo info:hashPages){
+            info.end();
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,98 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.hash;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * Key and index for DiskBased Hash Index
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashEntry implements Comparable{
+
+    static final int NOT_SET=-1;
+    private Comparable key;
+    private long indexOffset;
+
+    public int compareTo(Object o){
+        if(o instanceof HashEntry){
+            HashEntry other=(HashEntry)o;
+            return key.compareTo(other.key);
+        }else{
+            return key.compareTo(o);
+        }
+    }
+
+    public boolean equals(Object o){
+        return compareTo(o)==0;
+    }
+
+    public int hasCode(){
+        return key.hashCode();
+    }
+
+    public String toString(){
+        return "HashEntry("+key+","+indexOffset+")";
+    }
+
+    HashEntry copy(){
+        HashEntry copy=new HashEntry();
+        copy.key=this.key;
+        copy.indexOffset=this.indexOffset;
+        return copy;
+    }
+
+    /**
+     * @return the key
+     */
+    Comparable getKey(){
+        return this.key;
+    }
+
+    /**
+     * @param key the key to set
+     */
+    void setKey(Comparable key){
+        this.key=key;
+    }
+
+    /**
+     * @return the indexOffset
+     */
+    long getIndexOffset(){
+        return this.indexOffset;
+    }
+
+    /**
+     * @param indexOffset the indexOffset to set
+     */
+    void setIndexOffset(long indexOffset){
+        this.indexOffset=indexOffset;
+    }
+
+    void write(Marshaller keyMarshaller,DataOutput dataOut) throws IOException{
+        dataOut.writeLong(indexOffset);
+        keyMarshaller.writePayload(key,dataOut);
+    }
+
+    void read(Marshaller keyMarshaller,DataInput dataIn) throws IOException{
+        indexOffset=dataIn.readLong();
+        key=(Comparable)keyMarshaller.readPayload(dataIn);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,369 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.hash;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.index.Index;
+import org.apache.activemq.kaha.impl.index.IndexManager;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class HashIndex implements Index{
+
+    private static final String NAME_PREFIX="tree-index-";
+    private static final int DEFAULT_PAGE_SIZE;
+    private static final int DEFAULT_KEY_SIZE;
+    private static final Log log=LogFactory.getLog(HashIndex.class);
+    private final String name;
+    private File directory;
+    private File file;
+    private RandomAccessFile indexFile;
+    private IndexManager indexManager;
+    private int pageSize=DEFAULT_PAGE_SIZE;
+    private int keySize=DEFAULT_KEY_SIZE;
+    private int keysPerPage=pageSize/keySize;
+    private DataByteArrayInputStream dataIn;
+    private DataByteArrayOutputStream dataOut;
+    private byte[] readBuffer;
+    private HashBin[] bins;
+    private Marshaller keyMarshaller;
+    private long length=0;
+    private HashPage firstFree;
+    private HashPage lastFree;
+    private AtomicBoolean loaded=new AtomicBoolean();
+
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @throws IOException
+     */
+    public HashIndex(File directory,String name,IndexManager indexManager) throws IOException{
+        this(directory,name,indexManager,1024);
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @param numberOfBins
+     * @throws IOException
+     */
+    public HashIndex(File directory,String name,IndexManager indexManager,int numberOfBins) throws IOException{
+        this.directory=directory;
+        this.name=name;
+        this.indexManager=indexManager;
+        int capacity=1;
+        while(capacity<numberOfBins)
+            capacity<<=1;
+        this.bins=new HashBin[capacity];
+        openIndexFile();
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public void setKeyMarshaller(Marshaller marshaller){
+        this.keyMarshaller=marshaller;
+    }
+
+    /**
+     * @return the keySize
+     */
+    public int getKeySize(){
+        return this.keySize;
+    }
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public void setKeySize(int keySize){
+        this.keySize=keySize;
+        if(loaded.get()){
+            throw new RuntimeException("Pages already loaded - can't reset key size");
+        }
+    }
+
+    /**
+     * @return the pageSize
+     */
+    public int getPageSize(){
+        return this.pageSize;
+    }
+
+    /**
+     * @param pageSize the pageSize to set
+     */
+    public void setPageSize(int pageSize){
+        if(loaded.get()&&pageSize!=this.pageSize){
+            throw new RuntimeException("Pages already loaded - can't reset page size");
+        }
+        this.pageSize=pageSize;
+    }
+
+    public boolean isTransient(){
+        return false;
+    }
+
+    public void load(){
+        if(loaded.compareAndSet(false,true)){
+            keysPerPage=pageSize/keySize;
+            dataIn=new DataByteArrayInputStream();
+            dataOut=new DataByteArrayOutputStream(pageSize);
+            readBuffer=new byte[pageSize];
+            try{
+                openIndexFile();
+                long offset=0;
+                while((offset+pageSize)<=indexFile.length()){
+                    indexFile.seek(offset);
+                    indexFile.readFully(readBuffer,0,HashPage.PAGE_HEADER_SIZE);
+                    dataIn.restart(readBuffer);
+                    HashPage page=new HashPage(keysPerPage);
+                    page.setId(offset);
+                    page.readHeader(dataIn);
+                    if(!page.isActive()){
+                        if(lastFree!=null){
+                            lastFree.setNextFreePageId(offset);
+                            indexFile.seek(lastFree.getId());
+                            dataOut.reset();
+                            lastFree.writeHeader(dataOut);
+                            indexFile.write(dataOut.getData(),0,HashPage.PAGE_HEADER_SIZE);
+                            lastFree=page;
+                        }else{
+                            lastFree=firstFree=page;
+                        }
+                    }else{
+                        addToBin(page);
+                    }
+                    offset+=pageSize;
+                }
+                length=offset;
+            }catch(IOException e){
+                log.error("Failed to load index ",e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void unload() throws IOException{
+        if(loaded.compareAndSet(true,false)){
+            if(indexFile!=null){
+                indexFile.close();
+                indexFile=null;
+                firstFree=lastFree=null;
+                bins=new HashBin[bins.length];
+            }
+        }
+    }
+
+    public void store(Object key,StoreEntry value) throws IOException{
+        HashEntry entry=new HashEntry();
+        entry.setKey((Comparable)key);
+        entry.setIndexOffset(value.getOffset());
+        getBin(key).put(entry);
+    }
+
+    public StoreEntry get(Object key) throws IOException{
+        HashEntry entry=new HashEntry();
+        entry.setKey((Comparable)key);
+        HashEntry result=getBin(key).find(entry);
+        return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
+    }
+
+    public void remove(Object key) throws IOException{
+        HashEntry entry=new HashEntry();
+        entry.setKey((Comparable)key);
+        getBin(key).remove(entry);
+    }
+
+    public boolean containsKey(Object key) throws IOException{
+        return get(key)!=null;
+    }
+
+    public void clear() throws IOException{
+        unload();
+        delete();
+        openIndexFile();
+        load();
+    }
+
+    public void delete() throws IOException{
+        unload();
+        if(file.exists()){
+            boolean result=file.delete();
+        }
+        length=0;
+    }
+
+    HashPage lookupPage(long pageId) throws IOException{
+        HashPage result=null;
+        if(pageId>=0){
+            result=getFullPage(pageId);
+            if(result!=null){
+                if(result.isActive()){
+                }else{
+                    throw new IllegalStateException("Trying to access an inactive page: "+pageId);
+                }
+            }
+        }
+        return result;
+    }
+
+    HashPage createPage(int binId) throws IOException{
+        HashPage result=getNextFreePage();
+        if(result==null){
+            // allocate one
+            result=new HashPage(keysPerPage);
+            result.setId(length);
+            result.setBinId(binId);
+            writePageHeader(result);
+            length+=pageSize;
+            indexFile.seek(length);
+            indexFile.write(HashEntry.NOT_SET);
+        }
+        return result;
+    }
+
+    void releasePage(HashPage page) throws IOException{
+        page.reset();
+        page.setActive(false);
+        if(lastFree==null){
+            firstFree=lastFree=page;
+        }else{
+            lastFree.setNextFreePageId(page.getId());
+            writePageHeader(lastFree);
+        }
+        writePageHeader(page);
+    }
+
+    private HashPage getNextFreePage() throws IOException{
+        HashPage result=null;
+        if(firstFree!=null){
+            if(firstFree.equals(lastFree)){
+                result=firstFree;
+                firstFree=lastFree=null;
+            }else{
+                result=firstFree;
+                firstFree=getPageHeader(firstFree.getNextFreePageId());
+                if(firstFree==null){
+                    lastFree=null;
+                }
+            }
+            result.setActive(true);
+            result.reset();
+            writePageHeader(result);
+        }
+        return result;
+    }
+
+    void writeFullPage(HashPage page) throws IOException{
+        dataOut.reset();
+        page.write(keyMarshaller,dataOut);
+        if(dataOut.size()>pageSize){
+            throw new IOException("Page Size overflow: pageSize is "+pageSize+" trying to write "+dataOut.size());
+        }
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(),0,dataOut.size());
+    }
+
+    void writePageHeader(HashPage page) throws IOException{
+        dataOut.reset();
+        page.writeHeader(dataOut);
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(),0,HashPage.PAGE_HEADER_SIZE);
+    }
+
+    HashPage getFullPage(long id) throws IOException{
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer,0,pageSize);
+        dataIn.restart(readBuffer);
+        HashPage page=new HashPage(keysPerPage);
+        page.setId(id);
+        page.read(keyMarshaller,dataIn);
+        return page;
+    }
+
+    HashPage getPageHeader(long id) throws IOException{
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer,0,HashPage.PAGE_HEADER_SIZE);
+        dataIn.restart(readBuffer);
+        HashPage page=new HashPage(keysPerPage);
+        page.setId(id);
+        page.readHeader(dataIn);
+        return page;
+    }
+
+    void addToBin(HashPage page){
+        HashBin bin=getBin(page.getBinId());
+        bin.addHashPageInfo(page.getId(),page.getPersistedSize());
+    }
+
+    private HashBin getBin(int index){
+        HashBin result=bins[index];
+        if(result==null){
+            result=new HashBin(this,index,pageSize/keySize);
+            bins[index]=result;
+        }
+        return result;
+    }
+
+    private void openIndexFile() throws IOException{
+        if(indexFile==null){
+            file=new File(directory,NAME_PREFIX+name);
+            indexFile=new RandomAccessFile(file,"rw");
+        }
+    }
+
+    private HashBin getBin(Object key){
+        int hash=hash(key);
+        int i=indexFor(hash,bins.length);
+        return getBin(i);
+    }
+
+    static int hash(Object x){
+        int h=x.hashCode();
+        h+=~(h<<9);
+        h^=(h>>>14);
+        h+=(h<<4);
+        h^=(h>>>10);
+        return h;
+    }
+
+    static int indexFor(int h,int length){
+        return h&(length-1);
+    }
+    static{
+        DEFAULT_PAGE_SIZE=Integer.parseInt(System.getProperty("defaultPageSize","16384"));
+        DEFAULT_KEY_SIZE=Integer.parseInt(System.getProperty("defaultKeySize","96"));
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,245 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.hash;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * A Page within a HashPage
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashPage{
+
+    static final int PAGE_HEADER_SIZE=17;
+
+    
+   
+    private int maximumEntries;
+    private long id;
+    private int binId;
+    private int persistedSize = 0;
+    private List<HashEntry> hashIndexEntries;
+    /*
+     * for persistence only
+     */
+    private long nextFreePageId=HashEntry.NOT_SET;
+    private boolean active=true;
+
+    /**
+     * Constructor
+     * 
+     * @param hashIndex
+     * @param id
+     * @param parentId
+     * @param maximumEntries
+     */
+    HashPage(long id,int maximumEntries){
+        this(maximumEntries);
+       
+        this.id=id;
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param maximumEntries
+     */
+    public HashPage(int maximumEntries){
+        this.maximumEntries=maximumEntries;
+        this.hashIndexEntries=new ArrayList<HashEntry>(maximumEntries);
+    }
+
+    public String toString(){
+        return "HashPage["+getId()+":" + binId + "]";
+    }
+
+    public boolean equals(Object o){
+        boolean result=false;
+        if(o instanceof HashPage){
+            HashPage other=(HashPage)o;
+            result=other.id==id;
+        }
+        return result;
+    }
+
+    public int hashCode(){
+        return (int)id;
+    }
+
+    boolean isActive(){
+        return this.active;
+    }
+
+    void setActive(boolean active){
+        this.active=active;
+    }
+
+    long getNextFreePageId(){
+        return this.nextFreePageId;
+    }
+
+    void setNextFreePageId(long nextPageId){
+        this.nextFreePageId=nextPageId;
+    }
+
+    long getId(){
+        return id;
+    }
+
+    void setId(long id){
+        this.id=id;
+    }
+    
+    int getPersistedSize() {
+        return persistedSize;
+    }
+
+    void write(Marshaller keyMarshaller,DataOutput dataOut) throws IOException{
+        writeHeader(dataOut);
+        dataOut.writeInt(hashIndexEntries.size());
+        for(HashEntry entry:hashIndexEntries){
+            entry.write(keyMarshaller,dataOut);
+        }
+    }
+
+    void read(Marshaller keyMarshaller,DataInput dataIn) throws IOException{
+        readHeader(dataIn);
+        int size=dataIn.readInt();
+        hashIndexEntries.clear();
+        for(int i=0;i<size;i++){
+            HashEntry entry=new HashEntry();
+            entry.read(keyMarshaller,dataIn);
+            hashIndexEntries.add(entry);
+        }
+    }
+
+    void readHeader(DataInput dataIn) throws IOException{
+        active=dataIn.readBoolean();
+        nextFreePageId=dataIn.readLong();
+        binId=dataIn.readInt();
+        persistedSize=dataIn.readInt();
+    }
+
+    void writeHeader(DataOutput dataOut) throws IOException{
+        dataOut.writeBoolean(isActive());
+        dataOut.writeLong(nextFreePageId);
+        dataOut.writeInt(binId);
+        dataOut.writeInt(size());
+    }
+
+    boolean isEmpty(){
+        return hashIndexEntries.isEmpty();
+    }
+
+    boolean isFull(){
+        return(hashIndexEntries.size()>=maximumEntries);
+    }
+
+    boolean isUnderflowed(){
+        return hashIndexEntries.size()<(maximumEntries/2);
+    }
+
+    boolean isOverflowed(){
+        return hashIndexEntries.size()>maximumEntries;
+    }
+
+    List<HashEntry> getEntries(){
+        return hashIndexEntries;
+    }
+
+    void setEntries(List<HashEntry> newEntries){
+        this.hashIndexEntries=newEntries;
+    }
+
+    int getMaximumEntries(){
+        return this.maximumEntries;
+    }
+
+    void setMaximumEntries(int maximumEntries){
+        this.maximumEntries=maximumEntries;
+    }
+
+    int size(){
+        return hashIndexEntries.size();
+    }
+
+    
+    void reset() throws IOException{
+        hashIndexEntries.clear();
+        setNextFreePageId(HashEntry.NOT_SET);
+    }
+
+        
+
+    void addHashEntry(int index,HashEntry entry) throws IOException{
+        hashIndexEntries.add(index,entry);
+    }
+    
+
+    HashEntry getHashEntry(int index){
+        HashEntry result=hashIndexEntries.get(index);
+        return result;
+    }
+
+    HashEntry removeHashEntry(int index) throws IOException{
+        HashEntry result=hashIndexEntries.remove(index);
+        return result;
+    }
+
+    void removeAllTreeEntries(List<HashEntry> c){
+        hashIndexEntries.removeAll(c);
+    }
+
+    List<HashEntry> getSubList(int from,int to){
+        return new ArrayList<HashEntry>(hashIndexEntries.subList(from,to));
+    }
+
+
+    
+    /**
+     * @return the binId
+     */
+    int getBinId(){
+        return this.binId;
+    }
+
+    
+    /**
+     * @param binId the binId to set
+     */
+    void setBinId(int binId){
+        this.binId=binId;
+    }
+    
+    
+    void dump() {
+        
+        String str = this + ": ";
+        for(HashEntry entry: hashIndexEntries) {
+            str += entry + ",";
+        }
+        System.out.println(str);
+    }
+
+    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,105 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.hash;
+
+import java.io.IOException;
+
+/**
+ * A Page within a HashPageInfo
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashPageInfo{
+
+    private HashIndex hashIndex;
+    private long id;
+    private int size;
+    private HashPage page;
+
+    HashPageInfo(HashIndex index){
+        this.hashIndex=index;
+    }
+
+    /**
+     * @return the id
+     */
+    long getId(){
+        return this.id;
+    }
+
+    /**
+     * @param id the id to set
+     */
+    void setId(long id){
+        this.id=id;
+    }
+
+    /**
+     * @return the size
+     */
+    int size(){
+        return this.size;
+    }
+
+    /**
+     * @param size the size to set
+     */
+    void setSize(int size){
+        this.size=size;
+    }
+
+    void addHashEntry(int index,HashEntry entry) throws IOException{
+        page.addHashEntry(index,entry);
+        size++;
+    }
+
+    HashEntry getHashEntry(int index) throws IOException{
+        return page.getHashEntry(index);
+    }
+
+    HashEntry removeHashEntry(int index) throws IOException{
+        HashEntry result=page.removeHashEntry(index);
+        if(result!=null){
+            size--;
+        }
+        return result;
+    }
+    
+    void dump() {
+        page.dump();
+    }
+
+    void begin() throws IOException{
+        if(page==null){
+            page=hashIndex.getFullPage(id);
+        }
+    }
+
+    void end() {
+        page=null;
+    }
+    
+    HashPage getPage() {
+        return page;
+    }
+    
+    void setPage(HashPage page) {
+        this.page=page;
+    }
+
+    void save() throws IOException{
+        hashIndex.writeFullPage(page);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,146 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.tree;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * Key and index for a BTree
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class TreeEntry implements Comparable{
+
+    static final int NOT_SET=-1;
+    private Comparable key;
+    private long indexOffset;
+    private long prevPageId=NOT_SET;
+    private long nextPageId=NOT_SET;
+
+    public int compareTo(Object o){
+        if(o instanceof TreeEntry){
+            TreeEntry other=(TreeEntry)o;
+            return key.compareTo(other.key);
+        }else{
+            return key.compareTo(o);
+        }
+    }
+
+    public boolean equals(Object o){
+        return compareTo(o)==0;
+    }
+
+    public int hasCode(){
+        return key.hashCode();
+    }
+
+    public String toString(){
+        return "TreeEntry("+key+","+indexOffset+")prev="+prevPageId+",next="+nextPageId;
+    }
+
+    void reset(){
+        prevPageId=nextPageId=NOT_SET;
+    }
+
+    TreeEntry copy(){
+        TreeEntry copy=new TreeEntry();
+        copy.key=this.key;
+        copy.indexOffset=this.indexOffset;
+        copy.prevPageId=this.prevPageId;
+        copy.nextPageId=this.nextPageId;
+        return copy;
+    }
+
+    /**
+     * @return the key
+     */
+    Comparable getKey(){
+        return this.key;
+    }
+
+    /**
+     * @param key the key to set
+     */
+    void setKey(Comparable key){
+        this.key=key;
+    }
+
+    /**
+     * @return the nextPageId
+     */
+    long getNextPageId(){
+        return this.nextPageId;
+    }
+
+    /**
+     * @param nextPageId the nextPageId to set
+     */
+    void setNextPageId(long nextPageId){
+        this.nextPageId=nextPageId;
+    }
+
+    /**
+     * @return the prevPageId
+     */
+    long getPrevPageId(){
+        return this.prevPageId;
+    }
+
+    /**
+     * @param prevPageId the prevPageId to set
+     */
+    void setPrevPageId(long prevPageId){
+        this.prevPageId=prevPageId;
+    }
+    
+    /**
+     * @return the indexOffset
+     */
+     long getIndexOffset(){
+        return this.indexOffset;
+    }
+
+    
+    /**
+     * @param indexOffset the indexOffset to set
+     */
+     void setIndexOffset(long indexOffset){
+        this.indexOffset=indexOffset;
+    }
+
+    boolean hasChildPagesReferences(){
+        return prevPageId!=NOT_SET||nextPageId!=NOT_SET;
+    }
+
+    void write(Marshaller keyMarshaller,DataOutput dataOut) throws IOException{
+        keyMarshaller.writePayload(key,dataOut);
+        dataOut.writeLong(indexOffset);
+        dataOut.writeLong(nextPageId);
+        dataOut.writeLong(prevPageId);
+    }
+
+    void read(Marshaller keyMarshaller,DataInput dataIn) throws IOException{
+        key=(Comparable)keyMarshaller.readPayload(dataIn);
+        indexOffset=dataIn.readLong();
+        nextPageId=dataIn.readLong();
+        prevPageId=dataIn.readLong();
+    }
+
+    
+   
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java?view=auto&rev=503176
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java Fri Feb  2 22:53:59 2007
@@ -0,0 +1,412 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl.index.tree;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.index.Index;
+import org.apache.activemq.kaha.impl.index.IndexManager;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.LRUCache;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class TreeIndex implements Index{
+
+    private static final String NAME_PREFIX="tree-index-";
+    private static final int DEFAULT_PAGE_SIZE;
+    private static final int DEFAULT_KEY_SIZE;
+    private static final Log log=LogFactory.getLog(TreeIndex.class);
+    private final String name;
+    private File directory;
+    private File file;
+    private RandomAccessFile indexFile;
+    private IndexManager indexManager;
+    private int pageSize=DEFAULT_PAGE_SIZE;
+    private int keySize=DEFAULT_KEY_SIZE;
+    private int keysPerPage=pageSize/keySize;
+    private TreePage root;
+    private LRUCache<Long,TreePage> pageCache;
+    private DataByteArrayInputStream dataIn;
+    private DataByteArrayOutputStream dataOut;
+    private byte[] readBuffer;
+    private Marshaller keyMarshaller;
+    private long length=0;
+    private TreePage firstFree;
+    private TreePage lastFree;
+    private AtomicBoolean loaded=new AtomicBoolean();
+    private boolean enablePageCaching=true;
+    private int pageCacheSize=10;
+
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @throws IOException
+     */
+    public TreeIndex(File directory,String name,IndexManager indexManager) throws IOException{
+        this.directory=directory;
+        this.name=name;
+        this.indexManager=indexManager;
+        pageCache=new LRUCache<Long,TreePage>(pageCacheSize,pageCacheSize,0.75f,true);
+        openIndexFile();
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public void setKeyMarshaller(Marshaller marshaller){
+        if(loaded.get()){
+            throw new RuntimeException("Pages already loaded - can't set marshaller now");
+        }
+        this.keyMarshaller=marshaller;
+    }
+
+    /**
+     * @return the keySize
+     */
+    public int getKeySize(){
+        return this.keySize;
+    }
+
+    /**
+     * @param keySize the keySize to set
+     */
+    public void setKeySize(int keySize){
+        this.keySize=keySize;
+        if(loaded.get()){
+            throw new RuntimeException("Pages already loaded - can't reset key size");
+        }
+    }
+
+    /**
+     * @return the pageSize
+     */
+    public int getPageSize(){
+        return this.pageSize;
+    }
+
+    /**
+     * @param pageSize the pageSize to set
+     */
+    public void setPageSize(int pageSize){
+        if(loaded.get()&&pageSize!=this.pageSize){
+            throw new RuntimeException("Pages already loaded - can't reset page size");
+        }
+        this.pageSize=pageSize;
+    }
+
+    public boolean isTransient(){
+        return false;
+    }
+
+    /**
+     * @return the enablePageCaching
+     */
+    public boolean isEnablePageCaching(){
+        return this.enablePageCaching;
+    }
+
+    /**
+     * @param enablePageCaching the enablePageCaching to set
+     */
+    public void setEnablePageCaching(boolean enablePageCaching){
+        this.enablePageCaching=enablePageCaching;
+    }
+
+    /**
+     * @return the pageCacheSize
+     */
+    public int getPageCacheSize(){
+        return this.pageCacheSize;
+    }
+
+    /**
+     * @param pageCacheSize the pageCacheSize to set
+     */
+    public void setPageCacheSize(int pageCacheSize){
+        this.pageCacheSize=pageCacheSize;
+        pageCache.setMaxCacheSize(pageCacheSize);
+    }
+
+    public void load(){
+        if(loaded.compareAndSet(false,true)){
+            keysPerPage=pageSize/keySize;
+            dataIn=new DataByteArrayInputStream();
+            dataOut=new DataByteArrayOutputStream(pageSize);
+            readBuffer=new byte[pageSize];
+            try{
+                openIndexFile();
+                long offset=0;
+                while((offset+pageSize)<=indexFile.length()){
+                    indexFile.seek(offset);
+                    indexFile.readFully(readBuffer,0,TreePage.PAGE_HEADER_SIZE);
+                    dataIn.restart(readBuffer);
+                    TreePage page=new TreePage(keysPerPage);
+                    page.setTree(this);
+                    page.setId(offset);
+                    page.readHeader(dataIn);
+                    if(!page.isActive()){
+                        if(lastFree!=null){
+                            lastFree.setNextFreePageId(offset);
+                            indexFile.seek(lastFree.getId());
+                            dataOut.reset();
+                            lastFree.writeHeader(dataOut);
+                            indexFile.write(dataOut.getData(),0,TreePage.PAGE_HEADER_SIZE);
+                            lastFree=page;
+                        }else{
+                            lastFree=firstFree=page;
+                        }
+                    }else if(root==null&&page.isRoot()){
+                        root=getFullPage(offset);
+                    }
+                    offset+=pageSize;
+                }
+                length=offset;
+                if(root==null){
+                    root=createRoot();
+                }
+            }catch(IOException e){
+                log.error("Failed to load index ",e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void unload() throws IOException{
+        if(loaded.compareAndSet(true,false)){
+            if(indexFile!=null){
+                indexFile.close();
+                indexFile=null;
+                pageCache.clear();
+                root=null;
+                firstFree=lastFree=null;
+            }
+        }
+    }
+
+    public void store(Object key,StoreEntry value) throws IOException{
+        TreeEntry entry=new TreeEntry();
+        entry.setKey((Comparable)key);
+        entry.setIndexOffset(value.getOffset());
+        root.put(entry);
+    }
+
+    public StoreEntry get(Object key) throws IOException{
+        TreeEntry entry=new TreeEntry();
+        entry.setKey((Comparable)key);
+        TreeEntry result=root.find(entry);
+        return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
+    }
+
+    public void remove(Object key) throws IOException{
+        TreeEntry entry=new TreeEntry();
+        entry.setKey((Comparable)key);
+        root.remove(entry);
+    }
+
+    public boolean containsKey(Object key) throws IOException{
+        TreeEntry entry=new TreeEntry();
+        entry.setKey((Comparable)key);
+        return root.find(entry)!=null;
+    }
+
+    public void clear() throws IOException{
+        unload();
+        delete();
+        openIndexFile();
+        load();
+    }
+
+    public void delete() throws IOException{
+        unload();
+        if(file.exists()){
+            boolean result=file.delete();
+        }
+        length=0;
+    }
+
+    /**
+     * @return the root
+     */
+    TreePage getRoot(){
+        return this.root;
+    }
+
+
+    TreePage lookupPage(long pageId) throws IOException{
+        TreePage result=null;
+        if(pageId>=0){
+            if(root!=null&&root.getId()==pageId){
+                result=root;
+            }else{
+                result=getFromCache(pageId);
+            }
+            if(result==null){
+                result=getFullPage(pageId);
+                if(result!=null){
+                    if(result.isActive()){
+                        addToCache(result);
+                    }else{
+                        throw new IllegalStateException("Trying to access an inactive page: "+pageId+" root is "+root);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    TreePage createRoot() throws IOException{
+        TreePage result=createPage(-1);
+        root=result;
+        return result;
+    }
+
+    TreePage createPage(long parentId) throws IOException{
+        TreePage result=getNextFreePage();
+        if(result==null){
+            // allocate one
+            result=new TreePage(keysPerPage);
+            result.setId(length);
+            result.setTree(this);
+            result.setParentId(parentId);
+            writePage(result);
+            length+=pageSize;
+            indexFile.seek(length);
+            indexFile.write(TreeEntry.NOT_SET);
+        }
+        addToCache(result);
+        return result;
+    }
+
+    void releasePage(TreePage page) throws IOException{
+        removeFromCache(page);
+        page.reset();
+        page.setActive(false);
+        if(lastFree==null){
+            firstFree=lastFree=page;
+        }else{
+            lastFree.setNextFreePageId(page.getId());
+            writePage(lastFree);
+        }
+        writePage(page);
+    }
+    
+    private TreePage getNextFreePage() throws IOException{
+        TreePage result=null;
+        if(firstFree!=null){
+            if(firstFree.equals(lastFree)){
+                result=firstFree;
+                firstFree=lastFree=null;
+            }else{
+                result=firstFree;
+                firstFree=getPage(firstFree.getNextFreePageId());
+                if(firstFree==null){
+                    lastFree=null;
+                }
+            }
+            result.setActive(true);
+            result.reset();
+            result.saveHeader();
+        }
+        return result;
+    }
+
+    void writeFullPage(TreePage page) throws IOException{
+        dataOut.reset();
+        page.write(keyMarshaller,dataOut);
+        if(dataOut.size()>pageSize){
+            throw new IOException("Page Size overflow: pageSize is "+pageSize+" trying to write "+dataOut.size());
+        }
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(),0,dataOut.size());
+    }
+
+    void writePage(TreePage page) throws IOException{
+        dataOut.reset();
+        page.writeHeader(dataOut);
+        indexFile.seek(page.getId());
+        indexFile.write(dataOut.getData(),0,TreePage.PAGE_HEADER_SIZE);
+    }
+
+    TreePage getFullPage(long id) throws IOException{
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer,0,pageSize);
+        dataIn.restart(readBuffer);
+        TreePage page=new TreePage(keysPerPage);
+        page.setId(id);
+        page.setTree(this);
+        page.read(keyMarshaller,dataIn);
+        return page;
+    }
+
+    TreePage getPage(long id) throws IOException{
+        indexFile.seek(id);
+        indexFile.readFully(readBuffer,0,TreePage.PAGE_HEADER_SIZE);
+        dataIn.restart(readBuffer);
+        TreePage page=new TreePage(keysPerPage);
+        page.setId(id);
+        page.setTree(this);
+        page.readHeader(dataIn);
+        return page;
+    }
+
+    
+
+    private TreePage getFromCache(long pageId){
+        TreePage result=null;
+        if(enablePageCaching){
+            result=pageCache.get(pageId);
+        }
+        return result;
+    }
+
+    private void addToCache(TreePage page){
+        if(enablePageCaching){
+            pageCache.put(page.getId(),page);
+        }
+    }
+
+    private void removeFromCache(TreePage page){
+        if(enablePageCaching){
+            pageCache.remove(page.getId());
+        }
+    }
+
+    protected void openIndexFile() throws IOException{
+        if(indexFile==null){
+            file=new File(directory,NAME_PREFIX+name);
+            indexFile=new RandomAccessFile(file,"rw");
+        }
+    }
+    static{
+        DEFAULT_PAGE_SIZE=Integer.parseInt(System.getProperty("defaultPageSize","16384"));
+        DEFAULT_KEY_SIZE=Integer.parseInt(System.getProperty("defaultKeySize","96"));
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message