activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r677944 [4/11] - in /activemq/sandbox/kahadb: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/impl/ src/main/java/org/apache/kahadb/impl/async/ s...
Date Fri, 18 Jul 2008 15:49:52 GMT
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ListContainerImpl.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ListContainerImpl.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ListContainerImpl.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,892 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.ContainerId;
+import org.apache.kahadb.ListContainer;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.RuntimeStoreException;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.StoreLocation;
+import org.apache.kahadb.impl.DataManager;
+import org.apache.kahadb.impl.data.Item;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexManager;
+
+/**
+ * Implementation of a ListContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ListContainerImpl extends BaseContainerImpl implements ListContainer {
+
+    private static final Log LOG = LogFactory.getLog(ListContainerImpl.class);
+    protected Marshaller marshaller = Store.OBJECT_MARSHALLER;
+
+    public ListContainerImpl(ContainerId id, IndexItem root, IndexManager indexManager,
+                             DataManager dataManager, boolean persistentIndex) throws IOException {
+        super(id, root, indexManager, dataManager, persistentIndex);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#load()
+     */
+    public synchronized void load() {
+        checkClosed();
+        if (!loaded) {
+            if (!loaded) {
+                loaded = true;
+                try {
+                    init();
+                    long nextItem = root.getNextItem();
+                    while (nextItem != Item.POSITION_NOT_SET) {
+                        IndexItem item = indexManager.getIndex(nextItem);
+                        indexList.add(item);
+                        itemAdded(item, indexList.size() - 1, getValue(item));
+                        nextItem = item.getNextItem();
+                    }
+                } catch (IOException e) {
+                    LOG.error("Failed to load container " + getId(), e);
+                    throw new RuntimeStoreException(e);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#unload()
+     */
+    public synchronized void unload() {
+        checkClosed();
+        if (loaded) {
+            loaded = false;
+            indexList.clear();
+
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#setKeyMarshaller(org.apache.activemq.kaha.Marshaller)
+     */
+    public synchronized void setMarshaller(Marshaller marshaller) {
+        checkClosed();
+        this.marshaller = marshaller;
+    }
+
+    public synchronized boolean equals(Object obj) {
+        load();
+        boolean result = false;
+        if (obj != null && obj instanceof List) {
+            List other = (List)obj;
+            result = other.size() == size();
+            if (result) {
+                for (int i = 0; i < indexList.size(); i++) {
+                    Object o1 = other.get(i);
+                    Object o2 = get(i);
+                    result = o1 == o2 || (o1 != null && o2 != null && o1.equals(o2));
+                    if (!result) {
+                        break;
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#size()
+     */
+    public synchronized int size() {
+        load();
+        return indexList.size();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
+     */
+    public synchronized void addFirst(Object o) {
+        internalAddFirst(o);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
+     */
+    public synchronized void addLast(Object o) {
+        internalAddLast(o);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#removeFirst()
+     */
+    public synchronized Object removeFirst() {
+        load();
+        Object result = null;
+        IndexItem item = indexList.getFirst();
+        if (item != null) {
+            itemRemoved(0);
+            result = getValue(item);
+            IndexItem prev = root;
+            IndexItem next = indexList.size() > 1 ? (IndexItem)indexList.get(1) : null;
+            indexList.removeFirst();
+
+            delete(item, prev, next);
+            item = null;
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#removeLast()
+     */
+    public synchronized Object removeLast() {
+        load();
+        Object result = null;
+        IndexItem last = indexList.getLast();
+        if (last != null) {
+            itemRemoved(indexList.size() - 1);
+            result = getValue(last);
+            IndexItem prev = indexList.getPrevEntry(last);
+            IndexItem next = null;
+            indexList.removeLast();
+            delete(last, prev, next);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#isEmpty()
+     */
+    public synchronized boolean isEmpty() {
+        load();
+        return indexList.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#contains(java.lang.Object)
+     */
+    public synchronized boolean contains(Object o) {
+        load();
+        boolean result = false;
+        if (o != null) {
+            IndexItem next = indexList.getFirst();
+            while (next != null) {
+                Object value = getValue(next);
+                if (value != null && value.equals(o)) {
+                    result = true;
+                    break;
+                }
+                next = indexList.getNextEntry(next);
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#iterator()
+     */
+    public synchronized Iterator iterator() {
+        load();
+        return listIterator();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#toArray()
+     */
+    public synchronized Object[] toArray() {
+        load();
+        List<Object> tmp = new ArrayList<Object>(indexList.size());
+        IndexItem next = indexList.getFirst();
+        while (next != null) {
+            Object value = getValue(next);
+            tmp.add(value);
+            next = indexList.getNextEntry(next);
+        }
+        return tmp.toArray();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#toArray(T[])
+     */
+    public synchronized Object[] toArray(Object[] a) {
+        load();
+        List<Object> tmp = new ArrayList<Object>(indexList.size());
+        IndexItem next = indexList.getFirst();
+        while (next != null) {
+            Object value = getValue(next);
+            tmp.add(value);
+            next = indexList.getNextEntry(next);
+        }
+        return tmp.toArray(a);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#add(E)
+     */
+    public synchronized boolean add(Object o) {
+        load();
+        addLast(o);
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#remove(java.lang.Object)
+     */
+    public synchronized boolean remove(Object o) {
+        load();
+        boolean result = false;
+        int pos = 0;
+        IndexItem next = indexList.getFirst();
+        while (next != null) {
+            Object value = getValue(next);
+            if (value != null && value.equals(o)) {
+                remove(next);
+                itemRemoved(pos);
+                result = true;
+                break;
+            }
+            next = indexList.getNextEntry(next);
+            pos++;
+        }
+        return result;
+    }
+
+    protected synchronized void remove(IndexItem item) {
+        IndexItem prev = indexList.getPrevEntry(item);
+        IndexItem next = indexList.getNextEntry(item);
+        indexList.remove(item);
+
+        delete(item, prev, next);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#containsAll(java.util.Collection)
+     */
+    public synchronized boolean containsAll(Collection c) {
+        load();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object obj = i.next();
+            if (!contains(obj)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#addAll(java.util.Collection)
+     */
+    public synchronized boolean addAll(Collection c) {
+        load();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            add(i.next());
+        }
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#addAll(int, java.util.Collection)
+     */
+    public synchronized boolean addAll(int index, Collection c) {
+        load();
+        boolean result = false;
+        ListIterator e1 = listIterator(index);
+        Iterator e2 = c.iterator();
+        while (e2.hasNext()) {
+            e1.add(e2.next());
+            result = true;
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#removeAll(java.util.Collection)
+     */
+    public synchronized boolean removeAll(Collection c) {
+        load();
+        boolean result = true;
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object obj = i.next();
+            result &= remove(obj);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#retainAll(java.util.Collection)
+     */
+    public synchronized boolean retainAll(Collection c) {
+        load();
+        List<Object> tmpList = new ArrayList<Object>();
+        IndexItem next = indexList.getFirst();
+        while (next != null) {
+            Object o = getValue(next);
+            if (!c.contains(o)) {
+                tmpList.add(o);
+            }
+            next = indexList.getNextEntry(next);
+        }
+        for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
+            remove(i.next());
+        }
+        return !tmpList.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#clear()
+     */
+    public synchronized void clear() {
+        checkClosed();
+        super.clear();
+        doClear();
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#get(int)
+     */
+    public synchronized Object get(int index) {
+        load();
+        Object result = null;
+        IndexItem item = indexList.get(index);
+        if (item != null) {
+            result = getValue(item);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#set(int, E)
+     */
+    public synchronized Object set(int index, Object element) {
+        load();
+        Object result = null;
+        IndexItem replace = indexList.isEmpty() ? null : (IndexItem)indexList.get(index);
+        IndexItem prev = (indexList.isEmpty() || (index - 1) < 0) ? null : (IndexItem)indexList
+            .get(index - 1);
+        IndexItem next = (indexList.isEmpty() || (index + 1) >= size()) ? null : (IndexItem)indexList
+            .get(index + 1);
+        result = getValue(replace);
+        indexList.remove(index);
+        delete(replace, prev, next);
+        itemRemoved(index);
+        add(index, element);
+        return result;
+    }
+
+    protected synchronized IndexItem internalSet(int index, Object element) {
+        IndexItem replace = indexList.isEmpty() ? null : (IndexItem)indexList.get(index);
+        IndexItem prev = (indexList.isEmpty() || (index - 1) < 0) ? null : (IndexItem)indexList
+            .get(index - 1);
+        IndexItem next = (indexList.isEmpty() || (index + 1) >= size()) ? null : (IndexItem)indexList
+            .get(index + 1);
+        indexList.remove(index);
+        delete(replace, prev, next);
+        itemRemoved(index);
+        return internalAdd(index, element);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#add(int, E)
+     */
+    public synchronized void add(int index, Object element) {
+        load();
+        IndexItem item = insert(index, element);
+        indexList.add(index, item);
+        itemAdded(item, index, element);
+    }
+
+    protected synchronized StoreEntry internalAddLast(Object o) {
+        load();
+        IndexItem item = writeLast(o);
+        indexList.addLast(item);
+        itemAdded(item, indexList.size() - 1, o);
+        return item;
+    }
+
+    protected synchronized StoreEntry internalAddFirst(Object o) {
+        load();
+        IndexItem item = writeFirst(o);
+        indexList.addFirst(item);
+        itemAdded(item, 0, o);
+        return item;
+    }
+
+    protected synchronized IndexItem internalAdd(int index, Object element) {
+        load();
+        IndexItem item = insert(index, element);
+        indexList.add(index, item);
+        itemAdded(item, index, element);
+        return item;
+    }
+
+    protected synchronized StoreEntry internalGet(int index) {
+        load();
+        if (index >= 0 && index < indexList.size()) {
+            return indexList.get(index);
+        }
+        return null;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#doRemove(int)
+     */
+    public synchronized boolean doRemove(int index) {
+        load();
+        boolean result = false;
+        IndexItem item = indexList.get(index);
+        if (item != null) {
+            result = true;
+            IndexItem prev = indexList.getPrevEntry(item);
+            prev = prev != null ? prev : root;
+            IndexItem next = indexList.getNextEntry(prev);
+            indexList.remove(index);
+            itemRemoved(index);
+            delete(item, prev, next);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#remove(int)
+     */
+    public synchronized Object remove(int index) {
+        load();
+        Object result = null;
+        IndexItem item = indexList.get(index);
+        if (item != null) {
+            itemRemoved(index);
+            result = getValue(item);
+            IndexItem prev = indexList.getPrevEntry(item);
+            prev = prev != null ? prev : root;
+            IndexItem next = indexList.getNextEntry(item);
+            indexList.remove(index);
+            delete(item, prev, next);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#indexOf(java.lang.Object)
+     */
+    public synchronized int indexOf(Object o) {
+        load();
+        int result = -1;
+        if (o != null) {
+            int count = 0;
+            IndexItem next = indexList.getFirst();
+            while (next != null) {
+                Object value = getValue(next);
+                if (value != null && value.equals(o)) {
+                    result = count;
+                    break;
+                }
+                count++;
+                next = indexList.getNextEntry(next);
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#lastIndexOf(java.lang.Object)
+     */
+    public synchronized int lastIndexOf(Object o) {
+        load();
+        int result = -1;
+        if (o != null) {
+            int count = indexList.size() - 1;
+            IndexItem next = indexList.getLast();
+            while (next != null) {
+                Object value = getValue(next);
+                if (value != null && value.equals(o)) {
+                    result = count;
+                    break;
+                }
+                count--;
+                next = indexList.getPrevEntry(next);
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#listIterator()
+     */
+    public synchronized ListIterator listIterator() {
+        load();
+        return new ContainerListIterator(this, indexList, indexList.getRoot());
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#listIterator(int)
+     */
+    public synchronized ListIterator listIterator(int index) {
+        load();
+        IndexItem start = (index - 1) > 0 ? indexList.get(index - 1) : indexList.getRoot();
+        return new ContainerListIterator(this, indexList, start);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.List#subList(int, int)
+     */
+    public synchronized List<Object> subList(int fromIndex, int toIndex) {
+        load();
+        List<Object> result = new ArrayList<Object>();
+        int count = fromIndex;
+        IndexItem next = indexList.get(fromIndex);
+        while (next != null && count++ < toIndex) {
+            result.add(getValue(next));
+            next = indexList.getNextEntry(next);
+        }
+        return result;
+    }
+
+    /**
+     * add an Object to the list but get a StoreEntry of its position
+     * 
+     * @param object
+     * @return the entry in the Store
+     */
+    public synchronized StoreEntry placeLast(Object object) {
+        StoreEntry item = internalAddLast(object);
+        return item;
+    }
+
+    /**
+     * insert an Object in first position int the list but get a StoreEntry of
+     * its position
+     * 
+     * @param object
+     * @return the location in the Store
+     */
+    public synchronized StoreEntry placeFirst(Object object) {
+        StoreEntry item = internalAddFirst(object);
+        return item;
+    }
+
+    /**
+     * @param entry
+     * @param object
+     * @see org.apache.kahadb.ListContainer#update(org.apache.kahadb.StoreEntry,
+     *      java.lang.Object)
+     */
+    public synchronized void update(StoreEntry entry, Object object) {
+        try {
+            dataManager.updateItem(entry.getValueDataItem(), marshaller, object);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Retrieve an Object from the Store by its location
+     * 
+     * @param entry
+     * @return the Object at that entry
+     */
+    public synchronized Object get(final StoreEntry entry) {
+        load();
+        StoreEntry entryToUse = refresh(entry);
+        return getValue(entryToUse);
+    }
+
+    /**
+     * remove the Object at the StoreEntry
+     * 
+     * @param entry
+     * @return true if successful
+     */
+    public synchronized boolean remove(StoreEntry entry) {
+        IndexItem item = (IndexItem)entry;
+        load();
+        boolean result = false;
+        if (item != null) {
+
+            remove(item);
+            result = true;
+        }
+        return result;
+    }
+
+    /**
+     * Get the StoreEntry for the first item of the list
+     * 
+     * @return the first StoreEntry or null if the list is empty
+     */
+    public synchronized StoreEntry getFirst() {
+        load();
+        return indexList.getFirst();
+    }
+
+    /**
+     * Get the StoreEntry for the last item of the list
+     * 
+     * @return the last StoreEntry or null if the list is empty
+     */
+    public synchronized StoreEntry getLast() {
+        load();
+        return indexList.getLast();
+    }
+
+    /**
+     * Get the next StoreEntry from the list
+     * 
+     * @param entry
+     * @return the next StoreEntry or null
+     */
+    public synchronized StoreEntry getNext(StoreEntry entry) {
+        load();
+        IndexItem item = (IndexItem)entry;
+        return indexList.getNextEntry(item);
+    }
+
+    /**
+     * Get the previous StoreEntry from the list
+     * 
+     * @param entry
+     * @return the previous store entry or null
+     */
+    public synchronized StoreEntry getPrevious(StoreEntry entry) {
+        load();
+        IndexItem item = (IndexItem)entry;
+        return indexList.getPrevEntry(item);
+    }
+
+    /**
+     * It's possible that a StoreEntry could be come stale this will return an
+     * upto date entry for the StoreEntry position
+     * 
+     * @param entry old entry
+     * @return a refreshed StoreEntry
+     */
+    public synchronized StoreEntry refresh(StoreEntry entry) {
+        load();
+        return indexList.getEntry(entry);
+    }
+
+    protected synchronized IndexItem writeLast(Object value) {
+        IndexItem index = null;
+        try {
+            if (value != null) {
+                StoreLocation data = dataManager.storeDataItem(marshaller, value);
+                index = indexManager.createNewIndex();
+                index.setValueData(data);
+                IndexItem prev = indexList.getLast();
+                prev = prev != null ? prev : root;
+                IndexItem next = indexList.getNextEntry(prev);
+                prev.setNextItem(index.getOffset());
+                index.setPreviousItem(prev.getOffset());
+                updateIndexes(prev);
+                if (next != null) {
+                    next.setPreviousItem(index.getOffset());
+                    index.setNextItem(next.getOffset());
+                    updateIndexes(next);
+                }
+                storeIndex(index);
+            }
+        } catch (IOException e) {
+            LOG.error("Failed to write " + value, e);
+            throw new RuntimeStoreException(e);
+        }
+        return index;
+    }
+
+    protected synchronized IndexItem writeFirst(Object value) {
+        IndexItem index = null;
+        try {
+            if (value != null) {
+                StoreLocation data = dataManager.storeDataItem(marshaller, value);
+                index = indexManager.createNewIndex();
+                index.setValueData(data);
+                IndexItem prev = root;
+                IndexItem next = indexList.getNextEntry(prev);
+                prev.setNextItem(index.getOffset());
+                index.setPreviousItem(prev.getOffset());
+                updateIndexes(prev);
+                if (next != null) {
+                    next.setPreviousItem(index.getOffset());
+                    index.setNextItem(next.getOffset());
+                    updateIndexes(next);
+                }
+                storeIndex(index);
+            }
+        } catch (IOException e) {
+            LOG.error("Failed to write " + value, e);
+            throw new RuntimeStoreException(e);
+        }
+        return index;
+    }
+
+    protected synchronized IndexItem insert(int insertPos, Object value) {
+        IndexItem index = null;
+        try {
+            if (value != null) {
+                StoreLocation data = dataManager.storeDataItem(marshaller, value);
+                index = indexManager.createNewIndex();
+                index.setValueData(data);
+                IndexItem prev = null;
+                IndexItem next = null;
+                if (insertPos <= 0) {
+                    prev = root;
+                    next = indexList.getNextEntry(root);
+                } else if (insertPos >= indexList.size()) {
+                    prev = indexList.getLast();
+                    if (prev==null) {
+                        prev=root;
+                    }
+                    next = null;
+                } else {
+                    prev = indexList.get(insertPos);
+                    prev = prev != null ? prev : root;
+                    next = indexList.getNextEntry(prev);
+                }
+                prev.setNextItem(index.getOffset());
+                index.setPreviousItem(prev.getOffset());
+                updateIndexes(prev);
+                if (next != null) {
+                    next.setPreviousItem(index.getOffset());
+                    index.setNextItem(next.getOffset());
+                    updateIndexes(next);
+                }
+                storeIndex(index);
+                indexList.setRoot(root);
+            }
+        } catch (IOException e) {
+            LOG.error("Failed to insert " + value, e);
+            throw new RuntimeStoreException(e);
+        }
+        return index;
+    }
+
+    protected synchronized Object getValue(StoreEntry item) {
+        Object result = null;
+        if (item != null) {
+            try {
+                StoreLocation data = item.getValueDataItem();
+                result = dataManager.readItem(marshaller, data);
+            } catch (IOException e) {
+                LOG.error("Failed to get value for " + item, e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return a string representation of this collection.
+     */
+    public synchronized String toString() {
+        StringBuffer result = new StringBuffer();
+        result.append("[");
+        Iterator i = iterator();
+        boolean hasNext = i.hasNext();
+        while (hasNext) {
+            Object o = i.next();
+            result.append(String.valueOf(o));
+            hasNext = i.hasNext();
+            if (hasNext) {
+                result.append(", ");
+            }
+        }
+        result.append("]");
+        return result.toString();
+    }
+
+    protected synchronized void itemAdded(IndexItem item, int pos, Object value) {
+
+    }
+
+    protected synchronized void itemRemoved(int pos) {
+
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/MapContainerImpl.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/MapContainerImpl.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/MapContainerImpl.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.ContainerId;
+import org.apache.kahadb.IndexMBean;
+import org.apache.kahadb.MapContainer;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.RuntimeStoreException;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.StoreLocation;
+import org.apache.kahadb.impl.DataManager;
+import org.apache.kahadb.impl.data.Item;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexLinkedList;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.impl.index.VMIndex;
+import org.apache.kahadb.impl.index.hash.HashIndex;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Implementation of a MapContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public final class MapContainerImpl extends BaseContainerImpl implements MapContainer {
+
+    private static final Log LOG = LogFactory.getLog(MapContainerImpl.class);
+    protected Index index;
+    protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
+    protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
+    protected File directory;
+    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
+    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
+    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
+    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
+
+    public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
+                            DataManager dataManager, boolean persistentIndex) {
+        super(id, root, indexManager, dataManager, persistentIndex);
+        this.directory = directory;
+    }
+
+    public synchronized void init() {
+        super.init();
+        if (index == null) {
+            if (persistentIndex) {
+                String name = containerId.getDataContainerName() + "_" + containerId.getKey();
+                name=IOHelper.toFileSystemSafeName(name);
+                try {
+                    HashIndex hashIndex = new HashIndex(directory, name, indexManager);
+                    hashIndex.setNumberOfBins(getIndexBinSize());
+                    hashIndex.setKeySize(getIndexKeySize());
+                    hashIndex.setPageSize(getIndexPageSize());
+                    hashIndex.setMaximumCapacity(getIndexMaxBinSize());
+                    hashIndex.setLoadFactor(getIndexLoadFactor());
+                    this.index = hashIndex;
+                } catch (IOException e) {
+                    LOG.error("Failed to create HashIndex", e);
+                    throw new RuntimeException(e);
+                }
+            } else {
+                this.index = new VMIndex(indexManager);
+            }
+        }
+        index.setKeyMarshaller(keyMarshaller);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#load()
+     */
+    public synchronized void load() {
+        checkClosed();
+        if (!loaded) {
+            if (!loaded) {
+                loaded = true;
+                try {
+                    init();
+                    index.load();
+                    long nextItem = root.getNextItem();
+                    while (nextItem != Item.POSITION_NOT_SET) {
+                        IndexItem item = indexManager.getIndex(nextItem);
+                        StoreLocation data = item.getKeyDataItem();
+                        Object key = dataManager.readItem(keyMarshaller, data);
+                        if (index.isTransient()) {
+                            index.store(key, item);
+                        }
+                        indexList.add(item);
+                        nextItem = item.getNextItem();
+                    }
+                } catch (IOException e) {
+                    LOG.error("Failed to load container " + getId(), e);
+                    throw new RuntimeStoreException(e);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#unload()
+     */
+    public synchronized void unload() {
+        checkClosed();
+        if (loaded) {
+            loaded = false;
+            try {
+                index.unload();
+            } catch (IOException e) {
+                LOG.warn("Failed to unload the index", e);
+            }
+            indexList.clear();
+        }
+    }
+
+    public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
+        checkClosed();
+        this.keyMarshaller = keyMarshaller;
+        if (index != null) {
+            index.setKeyMarshaller(keyMarshaller);
+        }
+    }
+
+    public synchronized void setValueMarshaller(Marshaller valueMarshaller) {
+        checkClosed();
+        this.valueMarshaller = valueMarshaller;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#size()
+     */
+    public synchronized int size() {
+        load();
+        return indexList.size();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#isEmpty()
+     */
+    public synchronized boolean isEmpty() {
+        load();
+        return indexList.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
+     */
+    public synchronized boolean containsKey(Object key) {
+        load();
+        try {
+            return index.containsKey(key);
+        } catch (IOException e) {
+            LOG.error("Failed trying to find key: " + key, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
+     */
+    public synchronized Object get(Object key) {
+        load();
+        Object result = null;
+        StoreEntry item = null;
+        try {
+            item = index.get(key);
+        } catch (IOException e) {
+            LOG.error("Failed trying to get key: " + key, e);
+            throw new RuntimeException(e);
+        }
+        if (item != null) {
+            result = getValue(item);
+        }
+        return result;
+    }
+
+    /**
+     * Get the StoreEntry associated with the key
+     * 
+     * @param key
+     * @return the StoreEntry
+     */
+    public synchronized StoreEntry getEntry(Object key) {
+        load();
+        StoreEntry item = null;
+        try {
+            item = index.get(key);
+        } catch (IOException e) {
+            LOG.error("Failed trying to get key: " + key, e);
+            throw new RuntimeException(e);
+        }
+        return item;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
+     */
+    public synchronized boolean containsValue(Object o) {
+        load();
+        boolean result = false;
+        if (o != null) {
+            IndexItem item = indexList.getFirst();
+            while (item != null) {
+                Object value = getValue(item);
+                if (value != null && value.equals(o)) {
+                    result = true;
+                    break;
+                }
+                item = indexList.getNextEntry(item);
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
+     */
+    public synchronized void putAll(Map t) {
+        load();
+        if (t != null) {
+            for (Iterator i = t.entrySet().iterator(); i.hasNext();) {
+                Map.Entry entry = (Map.Entry)i.next();
+                put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#keySet()
+     */
+    public synchronized Set keySet() {
+        load();
+        return new ContainerKeySet(this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#values()
+     */
+    public synchronized Collection values() {
+        load();
+        return new ContainerValueCollection(this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#entrySet()
+     */
+    public synchronized Set entrySet() {
+        load();
+        return new ContainerEntrySet(this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
+     *      java.lang.Object)
+     */
+    public synchronized Object put(Object key, Object value) {
+        load();
+        Object result = remove(key);
+        IndexItem item = write(key, value);
+        try {
+            index.store(key, item);
+        } catch (IOException e) {
+            LOG.error("Failed trying to insert key: " + key, e);
+            throw new RuntimeException(e);
+        }
+        indexList.add(item);
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
+     */
+    public synchronized Object remove(Object key) {
+        load();
+        try {
+            Object result = null;
+            IndexItem item = (IndexItem)index.remove(key);
+            if (item != null) {
+                // refresh the index
+                item = (IndexItem)indexList.refreshEntry(item);
+                result = getValue(item);
+                IndexItem prev = indexList.getPrevEntry(item);
+                IndexItem next = indexList.getNextEntry(item);
+                indexList.remove(item);
+                delete(item, prev, next);
+            }
+            return result;
+        } catch (IOException e) {
+            LOG.error("Failed trying to remove key: " + key, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public synchronized boolean removeValue(Object o) {
+        load();
+        boolean result = false;
+        if (o != null) {
+            IndexItem item = indexList.getFirst();
+            while (item != null) {
+                Object value = getValue(item);
+                if (value != null && value.equals(o)) {
+                    result = true;
+                    // find the key
+                    Object key = getKey(item);
+                    if (key != null) {
+                        remove(key);
+                    }
+                    break;
+                }
+                item = indexList.getNextEntry(item);
+            }
+        }
+        return result;
+    }
+
+    protected synchronized void remove(IndexItem item) {
+        Object key = getKey(item);
+        if (key != null) {
+            remove(key);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.MapContainer#clear()
+     */
+    public synchronized void clear() {
+        checkClosed();
+        loaded = true;
+        init();
+        if (index != null) {
+            try {
+                index.clear();
+            } catch (IOException e) {
+                LOG.error("Failed trying clear index", e);
+                throw new RuntimeException(e);
+            }
+        }
+        super.clear();
+        doClear();
+    }
+
+    /**
+     * Add an entry to the Store Map
+     * 
+     * @param key
+     * @param value
+     * @return the StoreEntry associated with the entry
+     */
+    public synchronized StoreEntry place(Object key, Object value) {
+        load();
+        try {
+            remove(key);
+            IndexItem item = write(key, value);
+            index.store(key, item);
+            indexList.add(item);
+            return item;
+        } catch (IOException e) {
+            LOG.error("Failed trying to place key: " + key, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Remove an Entry from ther Map
+     * 
+     * @param entry
+     * @throws IOException
+     */
+    public synchronized void remove(StoreEntry entry) {
+        load();
+        IndexItem item = (IndexItem)entry;
+        if (item != null) {
+            Object key = getKey(item);
+            try {
+                index.remove(key);
+            } catch (IOException e) {
+                LOG.error("Failed trying to remove entry: " + entry, e);
+                throw new RuntimeException(e);
+            }
+            IndexItem prev = indexList.getPrevEntry(item);
+            IndexItem next = indexList.getNextEntry(item);
+            indexList.remove(item);
+            delete(item, prev, next);
+        }
+    }
+
+    public synchronized StoreEntry getFirst() {
+        load();
+        return indexList.getFirst();
+    }
+
+    public synchronized StoreEntry getLast() {
+        load();
+        return indexList.getLast();
+    }
+
+    public synchronized StoreEntry getNext(StoreEntry entry) {
+        load();
+        IndexItem item = (IndexItem)entry;
+        return indexList.getNextEntry(item);
+    }
+
+    public synchronized StoreEntry getPrevious(StoreEntry entry) {
+        load();
+        IndexItem item = (IndexItem)entry;
+        return indexList.getPrevEntry(item);
+    }
+
+    public synchronized StoreEntry refresh(StoreEntry entry) {
+        load();
+        return indexList.getEntry(entry);
+    }
+
+    /**
+     * Get the value from it's location
+     * 
+     * @param item
+     * @return the value associated with the store entry
+     */
+    public synchronized Object getValue(StoreEntry item) {
+        load();
+        Object result = null;
+        if (item != null) {
+            try {
+                // ensure this value is up to date
+                // item=indexList.getEntry(item);
+                StoreLocation data = item.getValueDataItem();
+                result = dataManager.readItem(valueMarshaller, data);
+            } catch (IOException e) {
+                LOG.error("Failed to get value for " + item, e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get the Key object from it's location
+     * 
+     * @param item
+     * @return the Key Object associated with the StoreEntry
+     */
+    public synchronized Object getKey(StoreEntry item) {
+        load();
+        Object result = null;
+        if (item != null) {
+            try {
+                StoreLocation data = item.getKeyDataItem();
+                result = dataManager.readItem(keyMarshaller, data);
+            } catch (IOException e) {
+                LOG.error("Failed to get key for " + item, e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+        return result;
+    }
+
+    protected IndexLinkedList getItemList() {
+        return indexList;
+    }
+
+    protected synchronized IndexItem write(Object key, Object value) {
+        IndexItem index = null;
+        try {
+            index = indexManager.createNewIndex();
+            StoreLocation data = dataManager.storeDataItem(keyMarshaller, key);
+            index.setKeyData(data);
+
+            if (value != null) {
+                data = dataManager.storeDataItem(valueMarshaller, value);
+                index.setValueData(data);
+            }
+            IndexItem prev = indexList.getLast();
+            prev = prev != null ? prev : indexList.getRoot();
+            IndexItem next = indexList.getNextEntry(prev);
+            prev.setNextItem(index.getOffset());
+            index.setPreviousItem(prev.getOffset());
+            updateIndexes(prev);
+            if (next != null) {
+                next.setPreviousItem(index.getOffset());
+                index.setNextItem(next.getOffset());
+                updateIndexes(next);
+            }
+            storeIndex(index);
+        } catch (IOException e) {
+            LOG.error("Failed to write " + key + " , " + value, e);
+            throw new RuntimeStoreException(e);
+        }
+        return index;
+    }
+
+    public int getIndexBinSize() {
+        return indexBinSize;
+    }
+
+    public void setIndexBinSize(int indexBinSize) {
+        this.indexBinSize = indexBinSize;
+    }
+
+    public int getIndexKeySize() {
+        return indexKeySize;
+    }
+
+    public void setIndexKeySize(int indexKeySize) {
+        this.indexKeySize = indexKeySize;
+    }
+
+    public int getIndexPageSize() {
+        return indexPageSize;
+    }
+
+    public void setIndexPageSize(int indexPageSize) {
+        this.indexPageSize = indexPageSize;
+    }
+    
+    public int getIndexLoadFactor() {
+        return indexLoadFactor;
+    }
+
+    public void setIndexLoadFactor(int loadFactor) {
+        this.indexLoadFactor = loadFactor;
+    }
+
+  
+    public IndexMBean getIndexMBean() {
+      return (IndexMBean) index;
+    }
+  
+    public int getIndexMaxBinSize() {
+        return indexMaxBinSize;
+    }
+
+    public void setIndexMaxBinSize(int maxBinSize) {
+        this.indexMaxBinSize = maxBinSize;
+    }
+   
+
+   
+    public String toString() {
+        load();
+        StringBuffer buf = new StringBuffer();
+        buf.append("{");
+        Iterator i = entrySet().iterator();
+        boolean hasNext = i.hasNext();
+        while (hasNext) {
+            Map.Entry e = (Entry) i.next();
+            Object key = e.getKey();
+            Object value = e.getValue();
+            buf.append(key);
+            buf.append("=");
+
+            buf.append(value);
+            hasNext = i.hasNext();
+            if (hasNext)
+                buf.append(", ");
+        }
+        buf.append("}");
+        return buf.toString();
+    }    
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/MapContainerImpl.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Map and List container implementations for Kaha
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataFile.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataFile.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * DataFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFile {
+
+    private File file;
+    private Integer number;
+    private int referenceCount;
+    private RandomAccessFile randomAcessFile;
+    private Object writerData;
+    private long length;
+    private boolean dirty;
+
+    DataFile(File file, int number) {
+        this.file = file;
+        this.number = Integer.valueOf(number);
+        length = file.exists() ? file.length() : 0;
+    }
+
+    Integer getNumber() {
+        return number;
+    }
+
+    synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException {
+        if (randomAcessFile == null) {
+            randomAcessFile = new RandomAccessFile(file, "rw");
+        }
+        return randomAcessFile;
+    }
+
+    synchronized long getLength() {
+        return length;
+    }
+
+    synchronized void incrementLength(int size) {
+        length += size;
+    }
+
+    synchronized void purge() throws IOException {
+        if (randomAcessFile != null) {
+            randomAcessFile.close();
+            randomAcessFile = null;
+        }
+    }
+
+    synchronized boolean delete() throws IOException {
+        purge();
+        return file.delete();
+    }
+
+    synchronized void close() throws IOException {
+        if (randomAcessFile != null) {
+            randomAcessFile.close();
+        }
+    }
+
+    synchronized int increment() {
+        return ++referenceCount;
+    }
+
+    synchronized int decrement() {
+        return --referenceCount;
+    }
+
+    synchronized boolean isUnused() {
+        return referenceCount <= 0;
+    }
+
+    public String toString() {
+        String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
+        return result;
+    }
+
+    /**
+     * @return Opaque data that a DataFileWriter may want to associate with the
+     *         DataFile.
+     */
+    public synchronized Object getWriterData() {
+        return writerData;
+    }
+
+    /**
+     * @param writerData - Opaque data that a DataFileWriter may want to
+     *                associate with the DataFile.
+     */
+    public synchronized void setWriterData(Object writerData) {
+        this.writerData = writerData;
+        dirty = true;
+    }
+
+    public synchronized boolean isDirty() {
+        return dirty;
+    }
+
+    public synchronized void setDirty(boolean value) {
+        this.dirty = value;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataItem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataItem.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataItem.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataItem.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+import org.apache.kahadb.StoreLocation;
+
+/**
+ * A a wrapper for a data in the store
+ * 
+ * @version $Revision: 1.2 $
+ */
+public final class DataItem implements Item, StoreLocation {
+
+    private int file = (int)POSITION_NOT_SET;
+    private long offset = POSITION_NOT_SET;
+    private int size;
+
+    public DataItem() {
+    }
+
+    DataItem(DataItem item) {
+        this.file = item.file;
+        this.offset = item.offset;
+        this.size = item.size;
+    }
+
+    boolean isValid() {
+        return file != POSITION_NOT_SET;
+    }
+
+    /**
+     * @return
+     * @see org.apache.kahadb.StoreLocation#getSize()
+     */
+    public int getSize() {
+        return size;
+    }
+
+    /**
+     * @param size The size to set.
+     */
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    /**
+     * @return
+     * @see org.apache.kahadb.StoreLocation#getOffset()
+     */
+    public long getOffset() {
+        return offset;
+    }
+
+    /**
+     * @param offset The offset to set.
+     */
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * @return
+     * @see org.apache.kahadb.StoreLocation#getFile()
+     */
+    public int getFile() {
+        return file;
+    }
+
+    /**
+     * @param file The file to set.
+     */
+    public void setFile(int file) {
+        this.file = file;
+    }
+
+    /**
+     * @return a pretty print
+     */
+    public String toString() {
+        String result = "offset = " + offset + ", file = " + file + ", size = " + size;
+        return result;
+    }
+
+    public DataItem copy() {
+        return new DataItem(this);
+    }
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataItem.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataManagerImpl.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataManagerImpl.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/DataManagerImpl.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreLocation;
+import org.apache.kahadb.impl.DataManager;
+import org.apache.kahadb.impl.index.RedoStoreIndexItem;
+import org.apache.kahadb.util.IOExceptionSupport;
+
+/**
+ * Manages DataFiles
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataManagerImpl implements DataManager {
+
+    public static final int ITEM_HEAD_SIZE = 5; // type + length
+    public static final byte DATA_ITEM_TYPE = 1;
+    public static final byte REDO_ITEM_TYPE = 2;
+    public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    
+    private static final Log LOG = LogFactory.getLog(DataManagerImpl.class);
+    private static final String NAME_PREFIX = "data-";
+    
+    private final File directory;
+    private final String name;
+    private SyncDataFileReader reader;
+    private SyncDataFileWriter writer;
+    private DataFile currentWriteFile;
+    private long maxFileLength = MAX_FILE_LENGTH;
+    private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
+    private String dataFilePrefix;
+    private final AtomicLong storeSize;
+
+    public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
+        this.directory = dir;
+        this.name = name;
+        this.storeSize=storeSize;
+
+        dataFilePrefix = NAME_PREFIX + name + "-";
+        // build up list of current dataFiles
+        File[] files = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String n) {
+                return dir.equals(directory) && n.startsWith(dataFilePrefix);
+            }
+        });
+        if (files != null) {
+            for (int i = 0; i < files.length; i++) {
+                File file = files[i];
+                String n = file.getName();
+                String numStr = n.substring(dataFilePrefix.length(), n.length());
+                int num = Integer.parseInt(numStr);
+                DataFile dataFile = new DataFile(file, num);
+                storeSize.addAndGet(dataFile.getLength());
+                fileMap.put(dataFile.getNumber(), dataFile);
+                if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
+                    currentWriteFile = dataFile;
+                }
+            }
+        }
+    }
+
+    private DataFile createAndAddDataFile(int num) {
+        String fileName = dataFilePrefix + num;
+        File file = new File(directory, fileName);
+        DataFile result = new DataFile(file, num);
+        fileMap.put(result.getNumber(), result);
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
+     */
+    public String getName() {
+        return name;
+    }
+
+    synchronized DataFile findSpaceForData(DataItem item) throws IOException {
+        if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
+            int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
+            if (currentWriteFile != null && currentWriteFile.isUnused()) {
+                removeDataFile(currentWriteFile);
+            }
+            currentWriteFile = createAndAddDataFile(nextNum);
+        }
+        item.setOffset(currentWriteFile.getLength());
+        item.setFile(currentWriteFile.getNumber().intValue());
+        int len = item.getSize() + ITEM_HEAD_SIZE;
+        currentWriteFile.incrementLength(len);
+        storeSize.addAndGet(len);
+        return currentWriteFile;
+    }
+
+    DataFile getDataFile(StoreLocation item) throws IOException {
+        Integer key = Integer.valueOf(item.getFile());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
+        }
+        return dataFile;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
+     *      org.apache.activemq.kaha.StoreLocation)
+     */
+    public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+        return getReader().readItem(marshaller, item);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
+     *      java.lang.Object)
+     */
+    public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+        return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
+     */
+    public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
+        return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
+     *      org.apache.activemq.kaha.Marshaller, java.lang.Object)
+     */
+    public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
+        throws IOException {
+        getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
+     */
+    public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
+
+        // Nothing to recover if there is no current file.
+        if (currentWriteFile == null) {
+            return;
+        }
+
+        DataItem item = new DataItem();
+        item.setFile(currentWriteFile.getNumber().intValue());
+        item.setOffset(0);
+        while (true) {
+            byte type;
+            try {
+                type = getReader().readDataItemSize(item);
+            } catch (IOException ignore) {
+                LOG.trace("End of data file reached at (header was invalid): " + item);
+                return;
+            }
+            if (type == REDO_ITEM_TYPE) {
+                // Un-marshal the redo item
+                Object object;
+                try {
+                    object = readItem(redoMarshaller, item);
+                } catch (IOException e1) {
+                    LOG.trace("End of data file reached at (payload was invalid): " + item);
+                    return;
+                }
+                try {
+
+                    listener.onRedoItem(item, object);
+                    // in case the listener is holding on to item references,
+                    // copy it
+                    // so we don't change it behind the listener's back.
+                    item = item.copy();
+
+                } catch (Exception e) {
+                    throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
+                }
+            }
+            // Move to the next item.
+            item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
+     */
+    public synchronized void close() throws IOException {
+        getWriter().close();
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            getWriter().force(dataFile);
+            dataFile.close();
+        }
+        fileMap.clear();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
+     */
+    public synchronized void force() throws IOException {
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            getWriter().force(dataFile);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
+     */
+    public synchronized boolean delete() throws IOException {
+        boolean result = true;
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            storeSize.addAndGet(-dataFile.getLength());
+            result &= dataFile.delete();
+        }
+        fileMap.clear();
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
+     */
+    public synchronized void addInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = fileMap.get(key);
+            if (dataFile == null) {
+                dataFile = createAndAddDataFile(file);
+            }
+            addInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void addInterestInFile(DataFile dataFile) {
+        if (dataFile != null) {
+            dataFile.increment();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
+     */
+    public synchronized void removeInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = fileMap.get(key);
+            removeInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
+        if (dataFile != null) {
+           
+            if (dataFile.decrement() <= 0) {
+                if (dataFile != currentWriteFile) {
+                    removeDataFile(dataFile);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
+     */
+    public synchronized void consolidateDataFiles() throws IOException {
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            if (dataFile.isUnused() && dataFile != currentWriteFile) {
+                purgeList.add(dataFile);
+            }
+        }
+        for (int i = 0; i < purgeList.size(); i++) {
+            DataFile dataFile = purgeList.get(i);
+            removeDataFile(dataFile);
+        }
+    }
+
+    private void removeDataFile(DataFile dataFile) throws IOException {
+        fileMap.remove(dataFile.getNumber());
+        if (writer != null) {
+            writer.force(dataFile);
+        }
+        storeSize.addAndGet(-dataFile.getLength());
+        boolean result = dataFile.delete();
+        LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
+     */
+    public Marshaller getRedoMarshaller() {
+        return redoMarshaller;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
+     */
+    public void setRedoMarshaller(Marshaller redoMarshaller) {
+        this.redoMarshaller = redoMarshaller;
+    }
+
+    /**
+     * @return the maxFileLength
+     */
+    public long getMaxFileLength() {
+        return maxFileLength;
+    }
+
+    /**
+     * @param maxFileLength the maxFileLength to set
+     */
+    public void setMaxFileLength(long maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public String toString() {
+        return "DataManager:(" + NAME_PREFIX + name + ")";
+    }
+
+    public synchronized SyncDataFileReader getReader() {
+        if (reader == null) {
+            reader = createReader();
+        }
+        return reader;
+    }
+
+    protected synchronized SyncDataFileReader createReader() {
+        return new SyncDataFileReader(this);
+    }
+
+    public synchronized void setReader(SyncDataFileReader reader) {
+        this.reader = reader;
+    }
+
+    public synchronized SyncDataFileWriter getWriter() {
+        if (writer == null) {
+            writer = createWriter();
+        }
+        return writer;
+    }
+
+    private SyncDataFileWriter createWriter() {
+        return new SyncDataFileWriter(this);
+    }
+
+    public synchronized void setWriter(SyncDataFileWriter writer) {
+        this.writer = writer;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/Item.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/Item.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/Item.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/Item.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+/**
+ * A a wrapper for a data in the store
+ * 
+ * @version $Revision: 1.2 $
+ */
+public interface Item {
+    long POSITION_NOT_SET = -1;
+    short MAGIC = 31317;
+    int ACTIVE = 22;
+    int FREE = 33;
+    int LOCATION_SIZE = 24;
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/Item.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/RedoListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/RedoListener.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/RedoListener.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/RedoListener.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+import org.apache.kahadb.StoreLocation;
+
+
+public interface RedoListener {
+
+    void onRedoItem(StoreLocation item, Object object) throws Exception;
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileReader.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileReader.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileReader.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreLocation;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class SyncDataFileReader {
+
+    private DataManagerImpl dataManager;
+    private DataByteArrayInputStream dataIn;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param fileId
+     */
+    SyncDataFileReader(DataManagerImpl fileManager) {
+        this.dataManager = fileManager;
+        this.dataIn = new DataByteArrayInputStream();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
+     */
+    public synchronized byte readDataItemSize(DataItem item) throws IOException {
+        RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+        file.seek(item.getOffset()); // jump to the size field
+        byte rc = file.readByte();
+        item.setSize(file.readInt());
+        return rc;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
+     *      org.apache.activemq.kaha.StoreLocation)
+     */
+    public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+        RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+
+        // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
+        // allocating byte[] arrays on every readItem.
+        byte[] data = new byte[item.getSize()];
+        file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE);
+        file.readFully(data);
+        dataIn.restart(data);
+        return marshaller.readPayload(dataIn);
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileWriter.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileWriter.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/SyncDataFileWriter.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+/**
+ * Optimized Store writer. Synchronously marshalls and writes to the data file.
+ * Simple but may introduce a bit of contention when put under load.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class SyncDataFileWriter {
+
+    private DataByteArrayOutputStream buffer;
+    private DataManagerImpl dataManager;
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param fileId
+     */
+    SyncDataFileWriter(DataManagerImpl fileManager) {
+        this.dataManager = fileManager;
+        this.buffer = new DataByteArrayOutputStream();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
+     *      java.lang.Object, byte)
+     */
+    public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
+        throws IOException {
+
+        // Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload, buffer);
+        int size = buffer.size();
+        int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+
+        // Find the position where this item will land at.
+        DataItem item = new DataItem();
+        item.setSize(payloadSize);
+        DataFile dataFile = dataManager.findSpaceForData(item);
+
+        // Now splat the buffer to the file.
+        dataFile.getRandomAccessFile().seek(item.getOffset());
+        dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
+        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+
+        dataManager.addInterestInFile(dataFile);
+        return item;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
+     *      org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
+     */
+    public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
+        throws IOException {
+        // Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload, buffer);
+        int size = buffer.size();
+        int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+        item.setSize(payloadSize);
+        DataFile dataFile = dataManager.getDataFile(item);
+        RandomAccessFile file = dataFile.getRandomAccessFile();
+        file.seek(item.getOffset());
+        file.write(buffer.getData(), 0, size);
+        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+    }
+
+    public synchronized void force(DataFile dataFile) throws IOException {
+        // If our dirty marker was set.. then we need to sync
+        if (dataFile.getWriterData() != null && dataFile.isDirty()) {
+            dataFile.getRandomAccessFile().getFD().sync();
+            dataFile.setWriterData(null);
+            dataFile.setDirty(false);
+        }
+    }
+
+    public void close() throws IOException {
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Data containers for Kaha. These are rolling data files that are discarded when no longer required
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/BadMagicException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/BadMagicException.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/BadMagicException.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/BadMagicException.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.index;
+
+import java.io.IOException;
+
+/**
+ * Occurs when bad magic occurs in reading a file
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class BadMagicException extends IOException {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = -570930196733067056L;
+
+    /**
+     * Default Constructor
+     * 
+     */
+    public BadMagicException() {
+        super();
+    }
+
+    /**
+     * Construct an Exception with a reason
+     * 
+     * @param s
+     */
+    public BadMagicException(String s) {
+        super(s);
+    }
+}



Mime
View raw message