activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r387586 [2/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/kaha/
Date Tue, 21 Mar 2006 17:22:06 GMT
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,822 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.ObjectMarshaller;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Implementation of a ListContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ListContainerImpl implements ListContainer{
+    private static final Log log=LogFactory.getLog(MapContainerImpl.class);
+    protected StoreImpl store;
+    protected LocatableItem root;
+    protected Object id;
+    protected LinkedList list=new LinkedList();
+    protected boolean loaded=false;
+    protected Marshaller marshaller=new ObjectMarshaller();
+    protected boolean closed = false;
+
+    protected ListContainerImpl(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
+        this.id=id;
+        this.store=rfs;
+        this.root=root;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#load()
+     */
+    public void load(){
+        checkClosed();
+        if(!loaded){
+            loaded=true;
+            long start=root.getNextItem();
+            if(start!=Item.POSITION_NOT_SET){
+                try{
+                    long nextItem=start;
+                    while(nextItem!=Item.POSITION_NOT_SET){
+                        LocatableItem item=new LocatableItem();
+                        item.setOffset(nextItem);
+                        store.readLocation(item);
+                        list.add(item);
+                        nextItem=item.getNextItem();
+                    }
+                }catch(IOException e){
+                    log.error("Failed to load container "+getId(),e);
+                    throw new RuntimeStoreException(e);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#unload()
+     */
+    public void unload(){
+        checkClosed();
+        if(loaded){
+            loaded = false;
+            list.clear();
+        }
+    }
+    
+    public void close(){
+        unload();
+        closed = true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#isLoaded()
+     */
+    public boolean isLoaded(){
+        checkClosed();
+        return loaded;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#setKeyMarshaller(org.apache.activemq.kaha.Marshaller)
+     */
+    public void setMarshaller(Marshaller marshaller){
+        checkClosed();
+        this.marshaller=marshaller;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#getId()
+     */
+    public Object getId(){
+        checkClosed();
+        return id;
+    }
+    
+    public boolean equals(Object obj){
+        checkLoaded();
+        checkClosed();
+        boolean result = false;
+        if (obj != null && obj instanceof List){
+            List other = (List) obj;
+            synchronized(list){
+            result = other.size() == size();
+            if (result){
+                for (int i =0; i < list.size(); i++){
+                    Object o1 = other.get(i);
+                    Object o2 = get(i);
+                    result = o1 == o2 || (o1 != null && o2 != null && o1.equals(o2));
+                    if (!result) break;
+                }
+            }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#size()
+     */
+    public int size(){
+        checkClosed();
+        checkLoaded();
+        return list.size();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
+     */
+    public void addFirst(Object o){
+        checkClosed();
+        checkLoaded();
+        LocatableItem item=writeFirst(o);
+        synchronized(list){
+            list.addFirst(item);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
+     */
+    public void addLast(Object o){
+        checkClosed();
+        checkLoaded();
+        LocatableItem item=writeLast(o);
+        synchronized(list){
+            list.addLast(item);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#removeFirst()
+     */
+    public Object removeFirst(){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        synchronized(list){
+            LocatableItem item=(LocatableItem) list.getFirst();
+            if(item!=null){
+                result=getValue(item);
+                int index=list.indexOf(item);
+                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
+                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                list.removeFirst();
+                delete(item,prev,next);
+                item=null;
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#removeLast()
+     */
+    public Object removeLast(){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        synchronized(list){
+            LocatableItem item=(LocatableItem) list.getLast();
+            if(item!=null){
+                result=getValue(item);
+                int index=list.indexOf(item);
+                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
+                LocatableItem next=null;
+                list.removeLast();
+                delete(item,prev,next);
+                item=null;
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#isEmpty()
+     */
+    public boolean isEmpty(){
+        checkClosed();
+        checkLoaded();
+        return list.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#contains(java.lang.Object)
+     */
+    public boolean contains(Object o){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        if(o!=null){
+            synchronized(list){
+                for(Iterator i=list.iterator();i.hasNext();){
+                    LocatableItem item=(LocatableItem) i.next();
+                    Object value=getValue(item);
+                    if(value!=null&&value.equals(o)){
+                        result=true;
+                        break;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#iterator()
+     */
+    public Iterator iterator(){
+        checkClosed();
+        checkLoaded();
+        return listIterator();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#toArray()
+     */
+    public Object[] toArray(){
+        checkClosed();
+        checkLoaded();
+        List tmp=new ArrayList(list.size());
+        synchronized(list){
+            for(Iterator i=list.iterator();i.hasNext();){
+                LocatableItem item=(LocatableItem) i.next();
+                Object value=getValue(item);
+                tmp.add(value);
+            }
+        }
+        return tmp.toArray();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#toArray(T[])
+     */
+    public Object[] toArray(Object[] a){
+        checkClosed();
+        checkLoaded();
+        List tmp=new ArrayList(list.size());
+        synchronized(list){
+            for(Iterator i=list.iterator();i.hasNext();){
+                LocatableItem item=(LocatableItem) i.next();
+                Object value=getValue(item);
+                tmp.add(value);
+            }
+        }
+        return tmp.toArray(a);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#add(E)
+     */
+    public boolean add(Object o){
+        checkClosed();
+        checkLoaded();
+        addLast(o);
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#remove(java.lang.Object)
+     */
+    public boolean remove(Object o){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        synchronized(list){
+            for(Iterator i=list.iterator();i.hasNext();){
+                LocatableItem item=(LocatableItem) i.next();
+                Object value = getValue(item);
+                if (value != null && value.equals(o)){
+                    remove(item);
+                    break;
+                }
+            }
+            
+        }
+        return result;
+    }
+
+    protected void remove(LocatableItem item){
+        synchronized(list){
+            int index=list.indexOf(item);
+            LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
+            LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+            list.remove(index);
+            delete(item,prev,next);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#containsAll(java.util.Collection)
+     */
+    public boolean containsAll(Collection c){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        synchronized(list){
+            for(Iterator i=c.iterator();i.hasNext();){
+                Object obj=i.next();
+                if(!(result=contains(obj))){
+                    result=false;
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#addAll(java.util.Collection)
+     */
+    public boolean addAll(Collection c){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        for(Iterator i=c.iterator();i.hasNext();){
+            add(i.next());
+            result=true;
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#addAll(int, java.util.Collection)
+     */
+    public boolean addAll(int index,Collection c){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        ListIterator e1=listIterator(index);
+        Iterator e2=c.iterator();
+        while(e2.hasNext()){
+            e1.add(e2.next());
+            result=true;
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#removeAll(java.util.Collection)
+     */
+    public boolean removeAll(Collection c){
+        checkClosed();
+        checkLoaded();
+        boolean result=true;
+        for(Iterator i=c.iterator();i.hasNext();){
+            Object obj=i.next();
+            result&=remove(obj);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#retainAll(java.util.Collection)
+     */
+    public boolean retainAll(Collection c){
+        checkClosed();
+        checkLoaded();
+        List tmpList=new ArrayList();
+        synchronized(list){
+        for(Iterator i = list.iterator(); i.hasNext();){
+            LocatableItem item = (LocatableItem) i.next();
+            Object o = getValue(item);
+            
+            if(!c.contains(o)){
+                tmpList.add(o);
+            }
+        }
+        }
+        for(Iterator i=tmpList.iterator();i.hasNext();){
+            remove(i.next());
+        }
+        return !tmpList.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#clear()
+     */
+    public void clear(){
+        checkClosed();
+        synchronized(list){
+            list.clear();
+            try {
+            long start=root.getNextItem();
+            if(start!=Item.POSITION_NOT_SET){
+                long nextItem=start;
+                while(nextItem!=Item.POSITION_NOT_SET){
+                    LocatableItem item=new LocatableItem();
+                    item.setOffset(nextItem);
+                    list.add(item);
+                    nextItem=item.getNextItem();
+                }
+            }
+            root.setNextItem(Item.POSITION_NOT_SET);
+            store.updateItem(root);
+            for(int i=0;i<list.size();i++){
+                LocatableItem item=(LocatableItem) list.get(i);
+                store.removeItem(item);
+            }
+            list.clear();
+            }catch(IOException e){
+                log.error("Failed to clear ListContainer "+getId(),e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#get(int)
+     */
+    public Object get(int index){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        LocatableItem item=(LocatableItem) list.get(index);
+        if(item!=null){
+            result=getValue(item);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#set(int, E)
+     */
+    public Object set(int index,Object element){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        synchronized(list){
+            LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
+            LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
+            LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
+            result=getValue(replace);
+            list.remove(index);
+            delete(replace,prev,next);
+            add(index,element);
+        }
+        return result;
+    }
+    
+    protected LocatableItem internalSet(int index,Object element){
+        synchronized(list){
+            LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
+            LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
+            LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
+            list.remove(index);
+            delete(replace,prev,next);
+            return internalAdd(index,element);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#add(int, E)
+     */
+    public void add(int index,Object element){
+        checkClosed();
+        checkLoaded();
+        synchronized(list){
+            LocatableItem item=insert(index,element);
+            list.add(index,item);
+        }
+    }
+    
+    protected LocatableItem internalAdd(int index,Object element){
+        synchronized(list){
+            LocatableItem item=insert(index,element);
+            list.add(index,item);
+            return item;
+        }
+    }
+    
+    protected LocatableItem internalGet(int index){
+        synchronized(list){
+            if (index >= 0 && index < list.size()){
+                return (LocatableItem) list.get(index);
+            }
+        }
+        return null;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#doRemove(int)
+     */
+    public boolean doRemove(int index){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        synchronized(list){
+            LocatableItem item=(LocatableItem) list.get(index);
+            if(item!=null){
+                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
+                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                list.remove(index);
+                delete(item,prev,next);
+                result=true;
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#remove(int)
+     */
+    public Object remove(int index){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        synchronized(list){
+            LocatableItem item=(LocatableItem) list.get(index);
+            if(item!=null){
+                result=getValue(item);
+                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
+                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                list.remove(index);
+                delete(item,prev,next);
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#indexOf(java.lang.Object)
+     */
+    public int indexOf(Object o){
+        checkClosed();
+        checkLoaded();
+        int result=-1;
+        if(o!=null){
+            synchronized(list){
+                int count=0;
+                for(Iterator i=list.iterator();i.hasNext();count++){
+                    LocatableItem item=(LocatableItem) i.next();
+                    Object value=getValue(item);
+                    if(value!=null&&value.equals(o)){
+                        result=count;
+                        break;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#lastIndexOf(java.lang.Object)
+     */
+    public int lastIndexOf(Object o){
+        checkClosed();
+        checkLoaded();
+        int result=-1;
+        if(o!=null){
+            synchronized(list){
+                int count=list.size()-1;
+                for(ListIterator i=list.listIterator();i.hasPrevious();count--){
+                    LocatableItem item=(LocatableItem) i.previous();
+                    Object value=getValue(item);
+                    if(value!=null&&value.equals(o)){
+                        result=count;
+                        break;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#listIterator()
+     */
+    public ListIterator listIterator(){
+        checkClosed();
+        checkLoaded();
+        ListIterator iter = ((List) list.clone()).listIterator();
+        return new ContainerListIterator(this,iter);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#listIterator(int)
+     */
+    public ListIterator listIterator(int index){
+        checkClosed();
+        checkLoaded();
+        List result = (List) list.clone();
+        ListIterator iter = result.listIterator(index);
+        return new ContainerListIterator(this,iter);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#subList(int, int)
+     */
+    public List subList(int fromIndex,int toIndex){
+        checkClosed();
+        checkLoaded();
+        List tmp = list.subList(fromIndex, toIndex);
+        LinkedList result = new LinkedList();
+        for (Iterator i = tmp.iterator(); i.hasNext();){
+            LocatableItem item = (LocatableItem) i.next();
+            result.add(getValue(item));
+        }
+        return result;
+    }
+
+    protected LocatableItem writeLast(Object value){
+        long pos=Item.POSITION_NOT_SET;
+        LocatableItem item=null;
+        try{
+            LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
+            last=last==null?root:last;
+            long prev=last.getOffset();
+            long next=Item.POSITION_NOT_SET;
+            item=new LocatableItem(prev,next,pos);
+            next=store.storeItem(marshaller,value,item);
+            if(last!=null){
+                last.setNextItem(next);
+                store.updateItem(last);
+            }
+        }catch(IOException e){
+            log.error("Failed to write "+value,e);
+            throw new RuntimeStoreException(e);
+        }
+        return item;
+    }
+
+    protected LocatableItem writeFirst(Object value){
+        long pos=Item.POSITION_NOT_SET;
+        LocatableItem item=null;
+        try{
+            LocatableItem next=list.isEmpty()?null:(LocatableItem) list.getFirst();
+            LocatableItem last=root;
+            long prevPos=last.getOffset();
+            long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
+            item=new LocatableItem(prevPos,nextPos,pos);
+            nextPos=store.storeItem(marshaller,value,item);
+            if(last!=null){
+                last.setNextItem(nextPos);
+                store.updateItem(last);
+            }
+            if(next!=null){
+                next.setPreviousItem(nextPos);
+                store.updateItem(next);
+            }
+        }catch(IOException e){
+            log.error("Failed to write "+value,e);
+            throw new RuntimeStoreException(e);
+        }
+        return item;
+    }
+
+    protected LocatableItem insert(int insertPos,Object value){
+        long pos=Item.POSITION_NOT_SET;
+        LocatableItem item=null;
+        try{
+            int lastPos=insertPos-1;
+            LocatableItem prev=(list.isEmpty() || (insertPos-1) < 0)?null:(LocatableItem) list.get(lastPos);
+            LocatableItem next=(list.isEmpty() || (insertPos+1) >= size())?null:(LocatableItem) list.get(insertPos+1);
+            prev=prev==null?root:prev;
+            long prevPos=prev.getOffset();
+            long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
+            item=new LocatableItem(prevPos,nextPos,pos);
+            nextPos=store.storeItem(marshaller,value,item);
+            if(prev!=null){
+                prev.setNextItem(nextPos);
+                store.updateItem(prev);
+            }
+            if(next!=null){
+                next.setPreviousItem(nextPos);
+                store.updateItem(next);
+            }
+        }catch(IOException e){
+            log.error("Failed to insert "+value,e);
+            throw new RuntimeStoreException(e);
+        }
+        return item;
+    }
+
+    protected Object getValue(LocatableItem item){
+        Object result=null;
+        if(item!=null){
+            try{
+                result=store.readItem(marshaller,item);
+            }catch(IOException e){
+                log.error("Failed to get value for "+item,e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+        return result;
+    }
+
+    protected void delete(LocatableItem item,LocatableItem prev,LocatableItem next){
+        try{
+            prev=prev==null?root:prev;
+            if(next!=null){
+                prev.setNextItem(next.getOffset());
+                next.setPreviousItem(prev.getOffset());
+                store.updateItem(next);
+            }else{
+                prev.setNextItem(Item.POSITION_NOT_SET);
+            }
+            store.updateItem(prev);
+            store.removeItem(item);
+        }catch(IOException e){
+            log.error("Failed to delete "+item,e);
+            throw new RuntimeStoreException(e);
+        }
+    }
+    
+    protected final void checkClosed(){
+        if (closed){
+            throw new RuntimeStoreException("The store is closed");
+        }
+    }
+    protected final void checkLoaded(){
+        if (!loaded){
+            throw new RuntimeStoreException("The container is not loaded");
+        }
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,126 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+* A an Item with a relative postion and location to other Items in the Store
+* 
+* @version $Revision: 1.2 $
+*/
+public final class LocatableItem extends Item implements Externalizable{
+    
+  
+    private static final long serialVersionUID=-6888731361600185708L;
+    private long previousItem=POSITION_NOT_SET;
+    private long nextItem=POSITION_NOT_SET;
+    private long referenceItem=POSITION_NOT_SET;
+   
+
+    public LocatableItem(){}
+
+    public LocatableItem(long prev,long next,long objOffset) throws IOException{
+        this.previousItem=prev;
+        this.nextItem=next;
+        this.referenceItem=objOffset;
+    }
+
+
+    public void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
+        dataOut.writeLong(previousItem);
+        dataOut.writeLong(nextItem);
+        dataOut.writeLong(referenceItem);
+        super.writePayload(marshaller,object,dataOut);
+    }
+
+    public Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
+        previousItem=dataIn.readLong();
+        nextItem=dataIn.readLong();
+        referenceItem=dataIn.readLong();
+        return super.readPayload(marshaller, dataIn);
+    }
+    
+    void readLocation(DataInput dataIn) throws IOException{
+        previousItem=dataIn.readLong();
+        nextItem=dataIn.readLong();
+        referenceItem=dataIn.readLong();
+    }
+
+    public void writeLocation(DataOutput dataOut) throws IOException{
+        dataOut.writeLong(previousItem);
+        dataOut.writeLong(nextItem);
+    }
+
+    public void setPreviousItem(long newPrevEntry){
+        previousItem=newPrevEntry;
+    }
+
+    public long getPreviousItem(){
+        return previousItem;
+    }
+
+    public void setNextItem(long newNextEntry){
+        nextItem=newNextEntry;
+    }
+
+    public long getNextItem(){
+        return nextItem;
+    }
+
+    public void setReferenceItem(long newObjectOffset){
+        referenceItem=newObjectOffset;
+    }
+
+    public long getReferenceItem(){
+        return referenceItem;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.kaha.impl.Item#toString()
+     */
+    public String toString(){
+        String result=super.toString();
+        result+=" , referenceItem = "+referenceItem+", previousItem = "+previousItem+" , nextItem = "+nextItem;
+        return result;
+    }
+
+    /* (non-Javadoc)
+     * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+     */
+    public void writeExternal(ObjectOutput out) throws IOException{
+        out.writeLong(previousItem);
+        out.writeLong(nextItem);
+        out.writeLong(referenceItem);
+        
+    }
+
+    /* (non-Javadoc)
+     * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
+     */
+    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
+       previousItem = in.readLong();
+       nextItem = in.readLong();
+       referenceItem = in.readLong();
+        
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,476 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.ObjectMarshaller;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Implementation of a MapContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class MapContainerImpl implements MapContainer{
+    private static final Log log=LogFactory.getLog(MapContainerImpl.class);
+    protected StoreImpl store;
+    protected LocatableItem root;
+    protected Object id;
+    protected Map map=new HashMap();
+    protected Map valueToKeyMap=new HashMap();
+    protected LinkedList list=new LinkedList();
+    protected boolean loaded=false;
+    protected Marshaller keyMarshaller=new ObjectMarshaller();
+    protected Marshaller valueMarshaller=new ObjectMarshaller();
+    protected final Object mutex=new Object();
+    protected boolean closed=false;
+
+    protected MapContainerImpl(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
+        this.id=id;
+        this.store=rfs;
+        this.root=root;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#load()
+     */
+    public void load(){
+        checkClosed();
+        if(!loaded){
+            loaded=true;
+            synchronized(mutex){
+                try{
+                    long start=root.getNextItem();
+                    if(start!=Item.POSITION_NOT_SET){
+                        long nextItem=start;
+                        while(nextItem!=Item.POSITION_NOT_SET){
+                            LocatableItem item=new LocatableItem();
+                            item.setOffset(nextItem);
+                            Object key=store.readItem(keyMarshaller,item);
+                            map.put(key,item);
+                            valueToKeyMap.put(item,key);
+                            list.add(item);
+                            nextItem=item.getNextItem();
+                        }
+                    }
+                }catch(IOException e){
+                    log.error("Failed to load container "+getId(),e);
+                    throw new RuntimeStoreException(e);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#unload()
+     */
+    public void unload(){
+        checkClosed();
+        if(loaded){
+            loaded=false;
+            synchronized(mutex){
+                map.clear();
+                valueToKeyMap.clear();
+                list.clear();
+            }
+        }
+    }
+
+    public void close(){
+        unload();
+        closed=true;
+    }
+
+    public void setKeyMarshaller(Marshaller keyMarshaller){
+        checkClosed();
+        this.keyMarshaller=keyMarshaller;
+    }
+
+    public void setValueMarshaller(Marshaller valueMarshaller){
+        checkClosed();
+        this.valueMarshaller=valueMarshaller;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#isLoaded()
+     */
+    public boolean isLoaded(){
+        checkClosed();
+        return loaded;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#getId()
+     */
+    public Object getId(){
+        checkClosed();
+        return id;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#size()
+     */
+    public int size(){
+        checkClosed();
+        checkLoaded();
+        return map.size();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#isEmpty()
+     */
+    public boolean isEmpty(){
+        checkClosed();
+        checkLoaded();
+        return map.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
+     */
+    public boolean containsKey(Object key){
+        checkClosed();
+        checkLoaded();
+        synchronized(mutex){
+            return map.containsKey(key);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
+     */
+    public Object get(Object key){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        LocatableItem item=null;
+        synchronized(mutex){
+            item=(LocatableItem) map.get(key);
+        }
+        if(item!=null){
+            result=getValue(item);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
+     */
+    public boolean containsValue(Object o){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        if(o!=null){
+            synchronized(list){
+                for(Iterator i=list.iterator();i.hasNext();){
+                    LocatableItem item=(LocatableItem) i.next();
+                    Object value=getValue(item);
+                    if(value!=null&&value.equals(o)){
+                        result=true;
+                        break;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
+     */
+    public void putAll(Map t){
+        checkClosed();
+        checkLoaded();
+        if(t!=null){
+            synchronized(mutex){
+                for(Iterator i=t.entrySet().iterator();i.hasNext();){
+                    Map.Entry entry=(Map.Entry) i.next();
+                    put(entry.getKey(),entry.getValue());
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#keySet()
+     */
+    public Set keySet(){
+        checkClosed();
+        checkLoaded();
+        return new ContainerKeySet(this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#values()
+     */
+    public Collection values(){
+        checkClosed();
+        checkLoaded();
+        return new ContainerValueCollection(this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#entrySet()
+     */
+    public Set entrySet(){
+        checkClosed();
+        checkLoaded();
+        return new ContainerEntrySet(this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
+     */
+    public Object put(Object key,Object value){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        synchronized(mutex){
+            if(map.containsKey(key)){
+                result=remove(key);
+            }
+            LocatableItem item=write(key,value);
+            map.put(key,item);
+            valueToKeyMap.put(item,key);
+            list.add(item);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
+     */
+    public Object remove(Object key){
+        checkClosed();
+        checkLoaded();
+        Object result=null;
+        synchronized(mutex){
+            LocatableItem item=(LocatableItem) map.get(key);
+            if(item!=null){
+                map.remove(key);
+                valueToKeyMap.remove(item);
+                result=getValue(item);
+                int index=list.indexOf(item);
+                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
+                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                list.remove(index);
+                {
+                    delete(item,prev,next);
+                }
+                item=null;
+            }
+        }
+        return result;
+    }
+
+    public boolean removeValue(Object o){
+        checkClosed();
+        checkLoaded();
+        boolean result=false;
+        if(o!=null){
+            synchronized(list){
+                for(Iterator i=list.iterator();i.hasNext();){
+                    LocatableItem item=(LocatableItem) i.next();
+                    Object value=getValue(item);
+                    if(value!=null&&value.equals(o)){
+                        result=true;
+                        // find the key
+                        Object key=valueToKeyMap.get(item);
+                        if(key!=null){
+                            remove(key);
+                        }
+                        break;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    protected void remove(LocatableItem item){
+        Object key=valueToKeyMap.get(item);
+        if(key!=null){
+            remove(key);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#clear()
+     */
+    public void clear(){
+        checkClosed();
+        synchronized(mutex){
+            loaded=true;
+            synchronized(mutex){
+                map.clear();
+                valueToKeyMap.clear();
+                list.clear();// going to re-use this
+                try{
+                    long start=root.getNextItem();
+                    if(start!=Item.POSITION_NOT_SET){
+                        long nextItem=start;
+                        while(nextItem!=Item.POSITION_NOT_SET){
+                            LocatableItem item=new LocatableItem();
+                            item.setOffset(nextItem);
+                            list.add(item);
+                            nextItem=item.getNextItem();
+                        }
+                    }
+                    root.setNextItem(Item.POSITION_NOT_SET);
+                    store.updateItem(root);
+                    for(int i=0;i<list.size();i++){
+                        LocatableItem item=(LocatableItem) list.get(i);
+                        if(item.getReferenceItem()!=Item.POSITION_NOT_SET){
+                            Item value=new Item();
+                            value.setOffset(item.getReferenceItem());
+                            store.removeItem(value);
+                        }
+                       
+                        store.removeItem(item);
+                    }
+                    list.clear();
+                }catch(IOException e){
+                    log.error("Failed to clear MapContainer "+getId(),e);
+                    throw new RuntimeStoreException(e);
+                }
+            }
+        }
+    }
+
+    protected Set getInternalKeySet(){
+        return new HashSet(map.keySet());
+    }
+
+    protected LinkedList getItemList(){
+        return list;
+    }
+
+    protected Object getValue(LocatableItem item){
+        Object result=null;
+        if(item!=null&&item.getReferenceItem()!=Item.POSITION_NOT_SET){
+            Item rec=new Item();
+            rec.setOffset(item.getReferenceItem());
+            try{
+                result=store.readItem(valueMarshaller,rec);
+            }catch(IOException e){
+                log.error("Failed to get value for "+item,e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+        return result;
+    }
+
+    protected LocatableItem write(Object key,Object value){
+        long pos=Item.POSITION_NOT_SET;
+        LocatableItem item=null;
+        try{
+            if(value!=null){
+                Item valueItem=new Item();
+                pos=store.storeItem(valueMarshaller,value,valueItem);
+            }
+            LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
+            last=last==null?root:last;
+            long prev=last.getOffset();
+            long next=Item.POSITION_NOT_SET;
+            item=new LocatableItem(prev,next,pos);
+            next=store.storeItem(keyMarshaller,key,item);
+            if(last!=null){
+                last.setNextItem(next);
+                store.updateItem(last);
+            }
+        }catch(IOException e){
+            e.printStackTrace();
+            log.error("Failed to write "+key+" , "+value,e);
+            throw new RuntimeStoreException(e);
+        }
+        return item;
+    }
+
+    protected void delete(LocatableItem key,LocatableItem prev,LocatableItem next){
+        try{
+            prev=prev==null?root:prev;
+            if(next!=null){
+                prev.setNextItem(next.getOffset());
+                next.setPreviousItem(prev.getOffset());
+                store.updateItem(next);
+            }else{
+                prev.setNextItem(Item.POSITION_NOT_SET);
+            }
+            store.updateItem(prev);
+            if(key.getReferenceItem()!=Item.POSITION_NOT_SET){
+                Item value=new Item();
+                value.setOffset(key.getReferenceItem());
+                store.removeItem(value);
+            }
+            store.removeItem(key);
+        }catch(IOException e){
+            log.error("Failed to delete "+key,e);
+            throw new RuntimeStoreException(e);
+        }
+    }
+
+    protected final void checkClosed(){
+        if(closed){
+            throw new RuntimeStoreException("The store is closed");
+        }
+    }
+
+    protected final void checkLoaded(){
+        if(!loaded){
+            throw new RuntimeStoreException("The container is not loaded");
+        }
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,97 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.ObjectMarshaller;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+* A container of roots for other Containers
+* 
+* @version $Revision: 1.2 $
+*/
+
+class RootContainer extends MapContainerImpl{
+    private static final Log log=LogFactory.getLog(RootContainer.class);
+    protected static final Marshaller rootMarshaller = new ObjectMarshaller();
+    
+    protected RootContainer(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
+        super(id,rfs,root);
+    }
+
+    protected void addRoot(Object key,LocatableItem er) throws IOException{
+        
+            if(map.containsKey(key)){
+                remove(key);
+            }
+            LocatableItem entry=writeRoot(key,er);
+            map.put(key,entry);
+            synchronized(list){
+            list.add(entry);
+            }
+        
+    }
+
+    protected LocatableItem writeRoot(Object key,LocatableItem value){
+        long pos=Item.POSITION_NOT_SET;
+        LocatableItem item=null;
+        try{
+            if(value!=null){
+                pos=store.storeItem(rootMarshaller,value,value);
+            }
+            LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
+            last=last==null?root:last;
+            long prev=last.getOffset();
+            long next=Item.POSITION_NOT_SET;
+            item=new LocatableItem(prev,next,pos);
+            if(log.isDebugEnabled())
+                log.debug("writing root ...");
+            if(log.isDebugEnabled())
+                log.debug("root = "+value);
+            next=store.storeItem(rootMarshaller,key,item);
+            if(last!=null){
+                last.setNextItem(next);
+                store.updateItem(last);
+            }
+        }catch(IOException e){
+            e.printStackTrace();
+            log.error("Failed to write root",e);
+            throw new RuntimeStoreException(e);
+        }
+        return item;
+    }
+
+    protected Object getValue(LocatableItem item){
+        LocatableItem result=null;
+        if(item!=null&&item.getReferenceItem()!=Item.POSITION_NOT_SET){
+            LocatableItem value=new LocatableItem();
+            value.setOffset(item.getReferenceItem());
+            try{
+                result=(LocatableItem) store.readItem(rootMarshaller,value);
+                //now read the item
+                result.setOffset(item.getReferenceItem());
+                store.readItem(rootMarshaller, result);
+            }catch(IOException e){
+                log.error("Could not read item "+item,e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+        return result;
+    }
+
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,149 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl;
+import java.io.ByteArrayInputStream;
+
+/**
+ * Optimized ByteArrayInputStream that can be used more than once
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StoreByteArrayInputStream extends ByteArrayInputStream {
+    /**
+     * Creates a <code>WireByteArrayInputStream</code>.
+     * 
+     * @param buf the input buffer.
+     */
+    public StoreByteArrayInputStream(byte buf[]) {
+        super(buf);
+    }
+
+    /**
+     * Creates <code>WireByteArrayInputStream</code> that uses <code>buf</code> as its buffer array.
+     * 
+     * @param buf the input buffer.
+     * @param offset the offset in the buffer of the first byte to read.
+     * @param length the maximum number of bytes to read from the buffer.
+     */
+    public StoreByteArrayInputStream(byte buf[], int offset, int length) {
+        super(buf, offset, length);
+    }
+    
+   
+    /**
+     * Creates <code>WireByteArrayInputStream</code> with a minmalist byte array
+     */
+    public StoreByteArrayInputStream() {
+        super(new byte[0]);
+    }
+    
+    /**
+     * @return the current position in the stream
+     */
+    public int position(){
+        return pos;
+    }
+    
+    /**
+     * @return the underlying data array
+     */
+    public byte[] getRawData(){
+        return buf;
+    }
+
+    /**
+     * reset the <code>WireByteArrayInputStream</code> to use an new byte array
+     * 
+     * @param newBuff buffer to use
+     * @param offset the offset in the buffer of the first byte to read.
+     * @param length the maximum number of bytes to read from the buffer.
+     */
+    public void restart(byte[] newBuff, int offset, int length) {
+        buf = newBuff;
+        pos = offset;
+        count = Math.min(offset + length, newBuff.length);
+        mark = offset;
+    }
+
+    /**
+     * reset the <code>WireByteArrayInputStream</code> to use an new byte array
+     * 
+     * @param newBuff
+     */
+    public void restart(byte[] newBuff) {
+        restart(newBuff, 0, newBuff.length);
+    }
+    
+    /**
+     * re-start the input stream - reusing the current buffer
+     * @param size
+     */
+    public void restart(int size){
+        if (buf == null || buf.length < size){
+            buf = new byte[size];
+        }
+        restart(buf);
+    }
+
+    /**
+     * Reads the next byte of data from this input stream. The value byte is returned as an <code>int</code> in the
+     * range <code>0</code> to <code>255</code>. If no byte is available because the end of the stream has been
+     * reached, the value <code>-1</code> is returned.
+     * <p>
+     * This <code>read</code> method cannot block.
+     * 
+     * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
+     */
+    public int read() {
+        return (pos < count) ? (buf[pos++] & 0xff) : -1;
+    }
+
+    /**
+     * Reads up to <code>len</code> bytes of data into an array of bytes from this input stream.
+     * 
+     * @param b the buffer into which the data is read.
+     * @param off the start offset of the data.
+     * @param len the maximum number of bytes read.
+     * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
+     * end of the stream has been reached.
+     */
+    public int read(byte b[], int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        }
+        else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (pos >= count) {
+            return -1;
+        }
+        if (pos + len > count) {
+            len = count - pos;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        System.arraycopy(buf, pos, b, off, len);
+        pos += len;
+        return len;
+    }
+
+    /**
+     * @return the number of bytes that can be read from the input stream without blocking.
+     */
+    public int available() {
+        return count - pos;
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,118 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Optimized ByteArrayOutputStream
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StoreByteArrayOutputStream extends ByteArrayOutputStream {
+    /**
+     * Creates a new byte array output stream. 
+     */
+    public StoreByteArrayOutputStream() {
+        super(16 * 1024);
+    }
+
+    /**
+     * Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes.
+     * 
+     * @param size the initial size.
+     * @exception IllegalArgumentException if size is negative.
+     */
+    public StoreByteArrayOutputStream(int size) {
+        super(size);
+    }
+
+    /**
+     * start using a fresh byte array
+     * 
+     * @param size
+     */
+    public void restart(int size) {
+        buf = new byte[size];
+        count = 0;
+    }
+
+    /**
+     * Writes the specified byte to this byte array output stream.
+     * 
+     * @param b the byte to be written.
+     */
+    public void write(int b) {
+        int newcount = count + 1;
+        if (newcount > buf.length) {
+            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+            System.arraycopy(buf, 0, newbuf, 0, count);
+            buf = newbuf;
+        }
+        buf[count] = (byte) b;
+        count = newcount;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to this byte
+     * array output stream.
+     * 
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     */
+    public void write(byte b[], int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        else if (len == 0) {
+            return;
+        }
+        int newcount = count + len;
+        if (newcount > buf.length) {
+            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+            System.arraycopy(buf, 0, newbuf, 0, count);
+            buf = newbuf;
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count = newcount;
+    }
+
+    /**
+     * @return the underlying byte[] buffer
+     */
+    public byte[] getData() {
+        return buf;
+    }
+    
+    /**
+     * reset the output stream
+     */
+    public void reset(){
+        count = 0;
+    }
+    
+    /**
+     * Set the current position for writing
+     * @param offset
+     */
+    public void position(int offset){
+        if (offset > buf.length) {
+            byte newbuf[] = new byte[Math.max(buf.length << 1, offset)];
+            System.arraycopy(buf, 0, newbuf, 0, count);
+            buf = newbuf;
+        }
+        count = offset;
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,380 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.activemq.kaha.Store;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+/**
+ * Implementation of a Store
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class StoreImpl implements Store{
+    private static final Log log = LogFactory.getLog(StoreImpl.class);
+
+    private final Object mutex=new Object();
+    private RandomAccessFile dataFile;
+    private Map mapContainers=new ConcurrentHashMap();
+    private Map listContainers=new ConcurrentHashMap();
+    private RootContainer rootMapContainer;
+    private RootContainer rootListContainer;
+    private String name;
+    private StoreReader reader;
+    private StoreWriter writer;
+    private FreeSpaceManager freeSpaceManager;
+    protected boolean closed=false;
+    protected Thread shutdownHook;
+
+    public StoreImpl(String name,String mode) throws IOException{
+        this.name=name;
+        this.dataFile=new RandomAccessFile(name,mode);
+        this.reader = new StoreReader(this.dataFile);
+        this.writer = new StoreWriter(this.dataFile);
+        File file = new File(name);
+        log.info("Kaha Store opened " + file.getAbsolutePath());
+        freeSpaceManager=new FreeSpaceManager(this.writer,this.reader);
+        initialization();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#close()
+     */
+    public void close() throws IOException{
+        synchronized(mutex){
+            if(!closed){
+                for(Iterator i=mapContainers.values().iterator();i.hasNext();){
+                    MapContainerImpl container=(MapContainerImpl) i.next();
+                    container.close();
+                }
+                for(Iterator i=listContainers.values().iterator();i.hasNext();){
+                    ListContainerImpl container=(ListContainerImpl) i.next();
+                    container.close();
+                }
+                force();
+                dataFile.close();
+                closed=true;
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#force()
+     */
+    public void force() throws IOException{
+        checkClosed();
+        synchronized(mutex){
+            dataFile.getFD().sync();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#clear()
+     */
+    public void clear(){
+        checkClosed();
+        for(Iterator i=mapContainers.values().iterator();i.hasNext();){
+            MapContainer container=(MapContainer) i.next();
+            container.clear();
+        }
+        for(Iterator i=listContainers.values().iterator();i.hasNext();){
+            ListContainer container=(ListContainer) i.next();
+            container.clear();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#delete()
+     */
+    public boolean delete() throws IOException{
+        checkClosed();
+        dataFile.close();
+        File file=new File(name);
+        return file.delete();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#doesMapContainerExist(java.lang.Object)
+     */
+    public boolean doesMapContainerExist(Object id){
+        return mapContainers.containsKey(id);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#getContainer(java.lang.Object)
+     */
+    public MapContainer getMapContainer(Object id) throws IOException{
+        checkClosed();
+        synchronized(mutex){
+            MapContainer result=(MapContainerImpl) mapContainers.get(id);
+            if(result==null){
+                LocatableItem root=new LocatableItem();
+                rootMapContainer.addRoot(id,root);
+                result=new MapContainerImpl(id,this,root);
+                mapContainers.put(id,result);
+            }
+            return result;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#deleteContainer(java.lang.Object)
+     */
+    public void deleteMapContainer(Object id) throws IOException{
+        checkClosed();
+        synchronized(mutex){
+            if(doesMapContainerExist(id)){
+                MapContainer container=getMapContainer(id);
+                if(container!=null){
+                    container.load();
+                    container.clear();
+                    rootMapContainer.remove(id);
+                    mapContainers.remove(id);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#getContainerKeys()
+     */
+    public Set getMapContainerIds(){
+        checkClosed();
+        return java.util.Collections.unmodifiableSet(mapContainers.keySet());
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#doesListContainerExist(java.lang.Object)
+     */
+    public boolean doesListContainerExist(Object id){
+        return listContainers.containsKey(id);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#getListContainer(java.lang.Object)
+     */
+    public ListContainer getListContainer(Object id) throws IOException{
+        checkClosed();
+        synchronized(mutex){
+            ListContainer result=(ListContainerImpl) listContainers.get(id);
+            if(result==null){
+                LocatableItem root=new LocatableItem();
+                rootListContainer.addRoot(id,root);
+                result=new ListContainerImpl(id,this,root);
+                listContainers.put(id,result);
+            }
+            return result;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#deleteListContainer(java.lang.Object)
+     */
+    public void deleteListContainer(Object id) throws IOException{
+        checkClosed();
+        synchronized(mutex){
+            if(doesListContainerExist(id)){
+                ListContainer container=getListContainer(id);
+                if(container!=null){
+                    container.load();
+                    container.clear();
+                    rootListContainer.remove(id);
+                    listContainers.remove(id);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.Store#getListContainerIds()
+     */
+    public Set getListContainerIds(){
+        checkClosed();
+        return java.util.Collections.unmodifiableSet(listContainers.keySet());
+    }
+
+    public void dumpFreeSpace(PrintWriter printer){
+        checkClosed();
+        synchronized(mutex){
+            freeSpaceManager.dump(printer);
+        }
+    }
+
+
+    protected long storeItem(Marshaller marshaller,Object payload,Item item) throws IOException{
+        synchronized(mutex){
+            int payloadSize = writer.loadPayload(marshaller, payload, item);
+            item.setSize(payloadSize);
+            // free space manager will set offset and write any headers required
+            // so the position should now be correct for writing
+            item=freeSpaceManager.getFreeSpace(item);
+            writer.storeItem(item,payloadSize);
+        }
+        return item.getOffset();
+    }
+
+    protected Object readItem(Marshaller marshaller,Item item) throws IOException{
+        synchronized(mutex){
+            return reader.readItem(marshaller, item);
+        }
+    }
+    
+    protected void readHeader(Item item) throws IOException{
+        synchronized(mutex){
+            reader.readHeader(item);
+        }
+    }
+    
+    protected void readLocation(Item item) throws IOException{
+        synchronized(mutex){
+            reader.readLocation(item);
+        }
+    }
+
+    protected void updateItem(Item item) throws IOException{
+        synchronized(mutex){
+            writer.updatePayload(item);
+        }
+    }
+
+    protected void removeItem(Item item) throws IOException{
+        synchronized(mutex){
+            freeSpaceManager.addFreeSpace(item);
+        }
+    }
+
+    private void initialization() throws IOException{
+        //add shutdown hook
+       addShutdownHook();
+        // check for new file
+        LocatableItem mapRoot=new LocatableItem();
+        LocatableItem listRoot=new LocatableItem();
+        if(dataFile.length()==0){
+            writer.allocateSpace(FreeSpaceManager.RESIZE_INCREMENT);
+            storeItem(RootContainer.rootMarshaller,"mapRoot",mapRoot);
+            storeItem(RootContainer.rootMarshaller,"listRoot",listRoot);
+        }else{
+            freeSpaceManager.scanStoredItems();
+            dataFile.seek(FreeSpaceManager.ROOT_SIZE);
+            mapRoot.setOffset(FreeSpaceManager.ROOT_SIZE);
+            readItem(RootContainer.rootMarshaller,mapRoot);
+            listRoot.setOffset(dataFile.getFilePointer());
+            readItem(RootContainer.rootMarshaller,listRoot);
+        }
+        rootMapContainer=new RootContainer("root",this,mapRoot);
+        rootMapContainer.load();
+        Set keys=rootMapContainer.keySet();
+        for(Iterator i=keys.iterator();i.hasNext();){
+            Object id=i.next();
+            if(id!=null){
+                LocatableItem item=(LocatableItem) rootMapContainer.get(id);
+                if(item!=null){
+                    MapContainer container=new MapContainerImpl(id,this,item);
+                    mapContainers.put(id,container);
+                }
+            }
+        }
+        rootListContainer=new RootContainer("root",this,listRoot);
+        rootListContainer.load();
+        keys=rootListContainer.keySet();
+        for(Iterator i=keys.iterator();i.hasNext();){
+            Object id=i.next();
+            if(id!=null){
+                LocatableItem item=(LocatableItem) rootListContainer.get(id);
+                if(item!=null){
+                    ListContainer container=new ListContainerImpl(id,this,item);
+                    listContainers.put(id,container);
+                }
+            }
+        }
+    }
+    
+    
+
+    
+
+    protected void checkClosed(){
+        if(closed){
+            throw new RuntimeStoreException("The store is closed");
+        }
+    }
+    
+    
+    protected void addShutdownHook() {
+      
+            shutdownHook = new Thread("Kaha Store implementation is shutting down") {
+                public void run() {
+                    if (!closed){
+                        try{
+                            //this needs to be really quick so ...
+                            closed = true;
+                            dataFile.close();
+                        }catch(Throwable e){
+                            log.error("Failed to close data file",e);
+                        }
+                    }
+                }
+            };
+            Runtime.getRuntime().addShutdownHook(shutdownHook);
+        
+    }
+
+    protected void removeShutdownHook() {
+        if (shutdownHook != null) {
+            try {
+                Runtime.getRuntime().removeShutdownHook(shutdownHook);
+            }
+            catch (Exception e) {
+                log.warn("Failed to run shutdown hook",e);
+            }
+        }
+    }
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreReader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreReader.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreReader.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreReader.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,75 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.kaha.impl;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class StoreReader{
+    protected RandomAccessFile dataFile;
+    protected StoreByteArrayInputStream bytesIn;
+    protected DataInputStream dataIn;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param file
+     */
+    StoreReader(RandomAccessFile file){
+        this.dataFile=file;
+        this.bytesIn=new StoreByteArrayInputStream();
+        this.dataIn=new DataInputStream(bytesIn);
+    }
+
+    protected void readHeader(Item item) throws IOException{
+        dataFile.seek(item.getOffset());
+        bytesIn.restart(Item.HEAD_SIZE);
+        dataFile.readFully(bytesIn.getRawData(),0,Item.HEAD_SIZE);
+        item.readHeader(dataIn);
+    }
+
+    protected void readLocation(Item item) throws IOException{
+        readHeader(item);
+        bytesIn.restart(Item.LOCATION_SIZE);
+        dataFile.readFully(bytesIn.getRawData(),0,Item.LOCATION_SIZE);
+        item.readLocation(dataIn);
+    }
+
+    protected Object readItem(Marshaller marshaller,Item item) throws IOException{
+        readHeader(item);
+        byte[] data=new byte[item.getSize()];
+        dataFile.readFully(data);
+        bytesIn.restart(data);
+        return item.readPayload(marshaller,dataIn);
+    }
+
+    long length() throws IOException{
+        return dataFile.length();
+    }
+
+    long position() throws IOException{
+        return dataFile.getFilePointer();
+    }
+
+    void position(long newPosition) throws IOException{
+        dataFile.seek(newPosition);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreWriter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreWriter.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreWriter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreWriter.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,119 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+/**
+ * Optimized writes to a RandomAcessFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * Optimized Store writer
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class StoreWriter{
+    protected RandomAccessFile dataFile;
+    protected StoreByteArrayOutputStream bytesOut;
+    protected DataOutputStream dataOut;
+    
+    /**
+     * Construct a Store writer
+     * @param file
+     */
+    StoreWriter(RandomAccessFile file){
+        this.dataFile = file;
+        this.bytesOut = new StoreByteArrayOutputStream();
+        this.dataOut = new DataOutputStream(bytesOut);
+    }
+    
+    void updateHeader(Item item) throws IOException{
+        bytesOut.reset();
+        item.writeHeader(dataOut);
+        dataFile.seek(item.getOffset());
+        dataFile.write(bytesOut.getData(),0,bytesOut.size());
+    }
+    
+    void updatePayload(Item item) throws IOException{
+        bytesOut.reset();
+        dataFile.seek(item.getOffset() + Item.HEAD_SIZE);
+        item.writeLocation(dataOut);
+        dataFile.write(bytesOut.getData(),0,bytesOut.size());
+    }
+    
+    int loadPayload(Marshaller marshaller, Object payload,Item item) throws IOException{
+        bytesOut.reset();
+        bytesOut.position(Item.HEAD_SIZE);
+        item.writePayload(marshaller, payload, dataOut);
+        return bytesOut.size() - Item.HEAD_SIZE;
+    }
+    
+    void storeItem(Item item,int payloadSize) throws IOException{
+        bytesOut.reset();
+        item.writeHeader(dataOut);
+        dataFile.seek(item.getOffset());
+        dataFile.write(bytesOut.getData(),0,payloadSize+Item.HEAD_SIZE);
+        
+    }
+    
+   
+    
+    void writeShort(long offset, int value) throws IOException{
+        bytesOut.reset();
+        dataFile.seek(offset);
+        dataOut.writeShort(value);    
+        dataFile.write(bytesOut.getData(),0,bytesOut.size());
+    }
+    
+    void writeInt(long offset,int value) throws IOException{
+        bytesOut.reset();
+        dataFile.seek(offset);
+        dataOut.writeInt(value);    
+        dataFile.write(bytesOut.getData(),0,bytesOut.size());
+    }
+    
+    void writeLong(long offset,long value) throws IOException{
+        bytesOut.reset();
+        dataFile.seek(offset);
+        dataOut.writeLong(value);    
+        dataFile.write(bytesOut.getData(),0,bytesOut.size());
+    }
+    
+    long length() throws IOException{
+        return dataFile.length();
+    }
+    
+    long position() throws IOException{
+        return dataFile.getFilePointer();
+    }
+    
+    void position(long newPosition) throws IOException{
+        dataFile.seek(newPosition);
+    }
+    
+    void allocateSpace(long newLength) throws IOException{
+        dataFile.getFD().sync();
+        long currentOffset=dataFile.getFilePointer();
+        dataFile.seek(newLength);
+        dataFile.write(0);
+        dataFile.seek(currentOffset);
+        dataFile.getFD().sync();
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/package.html
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/package.html?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/package.html (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/package.html Tue Mar 21 09:21:33 2006
@@ -0,0 +1,11 @@
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	fast message persistence implementation
+</p>
+
+</body>
+</html>

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,45 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Marshall an AtomicInteger
+ * @version $Revision: 1.10 $
+ */
+public class AtomicIntegerMarshaller implements Marshaller{
+   
+
+    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+       AtomicInteger ai = (AtomicInteger) object;
+       dataOut.writeInt(ai.get());
+       
+    }
+
+    public Object readPayload(DataInputStream dataIn) throws IOException{
+        int value = dataIn.readInt();
+        return new AtomicInteger(value);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java?rev=387586&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java Tue Mar 21 09:21:33 2006
@@ -0,0 +1,54 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.activeio.Packet;
+import org.activeio.command.WireFormat;
+import org.activeio.packet.ByteArrayPacket;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * Marshall a Message or a MessageReference
+ * @version $Revision: 1.10 $
+ */
+public class CommandMarshaller implements Marshaller{
+    
+    private WireFormat wireFormat;
+    public CommandMarshaller(WireFormat wireFormat){
+        this.wireFormat = wireFormat;
+      
+    }
+    
+    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+        Packet packet = wireFormat.marshal(object);
+        byte[] data = packet.sliceAsBytes();
+        dataOut.writeInt(data.length);
+        dataOut.write(data);
+    }
+
+   
+    public Object readPayload(DataInputStream dataIn) throws IOException{
+        int size=dataIn.readInt();
+        byte[] data=new byte[size];
+        dataIn.readFully(data);
+        return wireFormat.unmarshal(new ByteArrayPacket(data));
+    }
+}



Mime
View raw message