activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r677944 [3/11] - in /activemq/sandbox/kahadb: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/impl/ src/main/java/org/apache/kahadb/impl/async/ s...
Date Fri, 18 Jul 2008 15:49:52 GMT
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAppender.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAppender.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAppender.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFileAppender {
+
+    protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
+    protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+    protected final AsyncDataManager dataManager;
+    protected final Map<WriteKey, WriteCommand> inflightWrites;
+    protected final Object enqueueMutex = new Object(){};
+    protected WriteBatch nextWriteBatch;
+
+    protected boolean shutdown;
+    protected IOException firstAsyncException;
+    protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+    protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
+
+    private boolean running;
+    private Thread thread;
+
+    public static class WriteKey {
+        private final int file;
+        private final long offset;
+        private final int hash;
+
+        public WriteKey(Location item) {
+            file = item.getDataFileId();
+            offset = item.getOffset();
+            // TODO: see if we can build a better hash
+            hash = (int)(file ^ offset);
+        }
+
+        public int hashCode() {
+            return hash;
+        }
+
+        public boolean equals(Object obj) {
+            if (obj instanceof WriteKey) {
+                WriteKey di = (WriteKey)obj;
+                return di.file == file && di.offset == offset;
+            }
+            return false;
+        }
+    }
+
+    public class WriteBatch {
+
+        public final DataFile dataFile;
+        public final WriteCommand first;
+        public final CountDownLatch latch = new CountDownLatch(1);
+        public int size;
+
+        public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+            this.dataFile = dataFile;
+            this.first = write;
+            size += write.location.getSize();
+        }
+
+        public boolean canAppend(DataFile dataFile, WriteCommand write) {
+            if (dataFile != this.dataFile) {
+                return false;
+            }
+            if (size + write.location.getSize() >= maxWriteBatchSize) {
+                return false;
+            }
+            return true;
+        }
+
+        public void append(WriteCommand write) throws IOException {
+            this.first.getTailNode().linkAfter(write);
+            size += write.location.getSize();
+        }
+    }
+
+    public static class WriteCommand extends LinkedNode {
+        public final Location location;
+        public final ByteSequence data;
+        final boolean sync;
+        public final Runnable onComplete;
+
+        public WriteCommand(Location location, ByteSequence data, boolean sync) {
+            this.location = location;
+            this.data = data;
+            this.sync = sync;
+            this.onComplete=null;
+        }
+
+        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
+            this.location = location;
+            this.data = data;
+			this.onComplete = onComplete;
+            this.sync = false;
+		}
+    }
+
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param fileId
+     */
+    public DataFileAppender(AsyncDataManager dataManager) {
+        this.dataManager = dataManager;
+        this.inflightWrites = this.dataManager.getInflightWrites();
+    }
+
+    /**
+     * @param type
+     * @param marshaller
+     * @param payload
+     * @param type
+     * @param sync
+     * @return
+     * @throws IOException
+     * @throws
+     * @throws
+     */
+    public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+
+        // Write the packet our internal buffer.
+        int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, sync);
+
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that writes get equeued onto the executor in order that they were assigned
+        // by the data manager (which is basically just appending)
+
+        synchronized (this) {
+            // Find the position where this item will land at.
+            DataFile dataFile = dataManager.allocateLocation(location);
+            if( !sync ) {
+                inflightWrites.put(new WriteKey(location), write);
+            }
+            batch = enqueue(dataFile, write);
+        }
+        location.setLatch(batch.latch);
+        if (sync) {
+            try {
+                batch.latch.await();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }
+
+        return location;
+    }
+    
+	public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+        // Write the packet our internal buffer.
+        int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, onComplete);
+
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that writes get equeued onto the executor in order that they were assigned
+        // by the data manager (which is basically just appending)
+
+        synchronized (this) {
+            // Find the position where this item will land at.
+            DataFile dataFile = dataManager.allocateLocation(location);
+            inflightWrites.put(new WriteKey(location), write);
+            batch = enqueue(dataFile, write);
+        }
+        location.setLatch(batch.latch);
+
+        return location;
+	}
+
+    private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
+        synchronized (enqueueMutex) {
+            WriteBatch rc = null;
+            if (shutdown) {
+                throw new IOException("Async Writter Thread Shutdown");
+            }
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
+            }
+
+            if (!running) {
+                running = true;
+                thread = new Thread() {
+                    public void run() {
+                        processQueue();
+                    }
+                };
+                thread.setPriority(Thread.MAX_PRIORITY);
+                thread.setDaemon(true);
+                thread.setName("ActiveMQ Data File Writer");
+                thread.start();
+            }
+
+            if (nextWriteBatch == null) {
+                nextWriteBatch = new WriteBatch(dataFile, write);
+                rc = nextWriteBatch;
+                enqueueMutex.notify();
+            } else {
+                // Append to current batch if possible..
+                if (nextWriteBatch.canAppend(dataFile, write)) {
+                    nextWriteBatch.append(write);
+                    rc = nextWriteBatch;
+                } else {
+                    // Otherwise wait for the queuedCommand to be null
+                    try {
+                        while (nextWriteBatch != null) {
+                            enqueueMutex.wait();
+                        }
+                    } catch (InterruptedException e) {
+                        throw new InterruptedIOException();
+                    }
+                    if (shutdown) {
+                        throw new IOException("Async Writter Thread Shutdown");
+                    }
+
+                    // Start a new batch.
+                    nextWriteBatch = new WriteBatch(dataFile, write);
+                    rc = nextWriteBatch;
+                    enqueueMutex.notify();
+                }
+            }
+            return rc;
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (enqueueMutex) {
+            if (!shutdown) {
+                shutdown = true;
+                if (running) {
+                    enqueueMutex.notifyAll();
+                } else {
+                    shutdownDone.countDown();
+                }
+            }
+        }
+
+        try {
+            shutdownDone.await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+
+    }
+
+    /**
+     * The async processing loop that writes to the data files and does the
+     * force calls.
+     * 
+     * Since the file sync() call is the slowest of all the operations, this
+     * algorithm tries to 'batch' or group together several file sync() requests
+     * into a single file sync() call. The batching is accomplished attaching
+     * the same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    protected void processQueue() {
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        try {
+
+            DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        if (shutdown) {
+                            return;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile(true);
+                }
+
+                WriteCommand write = wb.first;
+
+                // Write all the data.
+                // Only need to seek to first location.. all others
+                // are in sequence.
+                file.seek(write.location.getOffset());
+
+                
+                boolean forceToDisk=false;
+                
+                // 
+                // is it just 1 big write?
+                if (wb.size == write.location.getSize()) {
+                    forceToDisk = write.sync | write.onComplete!=null;
+                    
+                    // Just write it directly..
+                    file.writeInt(write.location.getSize());
+                    file.writeByte(write.location.getType());
+                    file.write(RESERVED_SPACE);
+                    file.write(AsyncDataManager.ITEM_HEAD_SOR);
+                    file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                    file.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+                } else {
+
+                    // Combine the smaller writes into 1 big buffer
+                    while (write != null) {
+                        forceToDisk |= write.sync | write.onComplete!=null;
+
+                        buff.writeInt(write.location.getSize());
+                        buff.writeByte(write.location.getType());
+                        buff.write(RESERVED_SPACE);
+                        buff.write(AsyncDataManager.ITEM_HEAD_SOR);
+                        buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                        buff.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+                        write = (WriteCommand)write.getNext();
+                    }
+
+                    // Now do the 1 big write.
+                    ByteSequence sequence = buff.toByteSequence();
+                    file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                    buff.reset();
+                }
+
+                if( forceToDisk ) {
+                    file.getFD().sync();
+                }
+                
+                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                dataManager.setLastAppendLocation(lastWrite.location);
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.first;
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    if( write.onComplete !=null ) {
+                    	 try {
+							write.onComplete.run();
+						} catch (Throwable e) {
+							e.printStackTrace();
+						}
+                    }
+                    write = (WriteCommand)write.getNext();
+                }
+                
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
+            }
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (Throwable ignore) {
+            }
+            shutdownDone.countDown();
+        }
+    }
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataManagerFacade.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataManagerFacade.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataManagerFacade.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataManagerFacade.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.IOException;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreLocation;
+import org.apache.kahadb.impl.data.RedoListener;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+/**
+ * Provides a Kaha DataManager Facade to the DataManager.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataManagerFacade implements org.apache.kahadb.impl.DataManager {
+
+    private static final ByteSequence FORCE_COMMAND = new ByteSequence(new byte[] {'F', 'O', 'R', 'C', 'E'});
+
+    private AsyncDataManager dataManager;
+    private final String name;
+    private Marshaller redoMarshaller;
+
+    private static class StoreLocationFacade implements StoreLocation {
+        private final Location location;
+
+        public StoreLocationFacade(Location location) {
+            this.location = location;
+        }
+
+        public int getFile() {
+            return location.getDataFileId();
+        }
+
+        public long getOffset() {
+            return location.getOffset();
+        }
+
+        public int getSize() {
+            return location.getSize();
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+    }
+
+    public DataManagerFacade(AsyncDataManager dataManager, String name) {
+        this.dataManager = dataManager;
+        this.name = name;
+    }
+
+    private static StoreLocation convertToStoreLocation(Location location) {
+        if (location == null) {
+            return null;
+        }
+        return new StoreLocationFacade(location);
+    }
+
+    private static Location convertFromStoreLocation(StoreLocation location) {
+
+        if (location == null) {
+            return null;
+        }
+
+        if (location.getClass() == StoreLocationFacade.class) {
+            return ((StoreLocationFacade)location).getLocation();
+        }
+
+        Location l = new Location();
+        l.setOffset((int)location.getOffset());
+        l.setSize(location.getSize());
+        l.setDataFileId(location.getFile());
+        return l;
+    }
+
+
+    public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
+        ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
+        DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
+        return marshaller.readPayload(dataIn);
+    }
+
+    public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+        final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        marshaller.writePayload(payload, buffer);
+        ByteSequence data = buffer.toByteSequence();
+        return convertToStoreLocation(dataManager.write(data, (byte)1, false));
+    }
+
+    public void force() throws IOException {
+        dataManager.write(FORCE_COMMAND, (byte)2, true);
+    }
+
+    public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
+        final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        marshaller.writePayload(payload, buffer);
+        ByteSequence data = buffer.toByteSequence();
+        dataManager.update(convertFromStoreLocation(location), data, false);
+    }
+
+    public void close() throws IOException {
+        dataManager.close();
+    }
+
+    public void consolidateDataFiles() throws IOException {
+        dataManager.consolidateDataFiles();
+    }
+
+    public boolean delete() throws IOException {
+        return dataManager.delete();
+    }
+
+    public void addInterestInFile(int file) throws IOException {
+        dataManager.addInterestInFile(file);
+    }
+
+    public void removeInterestInFile(int file) throws IOException {
+        dataManager.removeInterestInFile(file);
+    }
+
+    public void recoverRedoItems(RedoListener listener) throws IOException {
+        throw new RuntimeException("Not Implemented..");
+    }
+
+    public StoreLocation storeRedoItem(Object payload) throws IOException {
+        throw new RuntimeException("Not Implemented..");
+    }
+
+    public Marshaller getRedoMarshaller() {
+        return redoMarshaller;
+    }
+
+    public void setRedoMarshaller(Marshaller redoMarshaller) {
+        this.redoMarshaller = redoMarshaller;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/Location.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/Location.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/Location.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Used as a location in the data store.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public final class Location implements Comparable<Location> {
+
+    public static final byte MARK_TYPE = -1;
+    public static final byte USER_TYPE = 1;
+    public static final byte NOT_SET_TYPE = 0;
+    public static final int NOT_SET = -1;
+
+    private int dataFileId = NOT_SET;
+    private int offset = NOT_SET;
+    private int size = NOT_SET;
+    private byte type = NOT_SET_TYPE;
+    private CountDownLatch latch;
+
+    public Location() {
+    }
+
+    Location(Location item) {
+        this.dataFileId = item.dataFileId;
+        this.offset = item.offset;
+        this.size = item.size;
+        this.type = item.type;
+    }
+
+    boolean isValid() {
+        return dataFileId != NOT_SET;
+    }
+
+    /**
+     * @return the size of the data record including the header.
+     */
+    public int getSize() {
+        return size;
+    }
+
+    /**
+     * @param size the size of the data record including the header.
+     */
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    /**
+     * @return the size of the payload of the record.
+     */
+    public int getPaylodSize() {
+        return size - AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    public void setOffset(int offset) {
+        this.offset = offset;
+    }
+
+    public int getDataFileId() {
+        return dataFileId;
+    }
+
+    public void setDataFileId(int file) {
+        this.dataFileId = file;
+    }
+
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    public String toString() {
+        String result = "offset = " + offset + ", file = " + dataFileId + ", size = " + size + ", type = "
+                        + type;
+        return result;
+    }
+
+    public void writeExternal(DataOutput dos) throws IOException {
+        dos.writeInt(dataFileId);
+        dos.writeInt(offset);
+        dos.writeInt(size);
+        dos.writeByte(type);
+    }
+
+    public void readExternal(DataInput dis) throws IOException {
+        dataFileId = dis.readInt();
+        offset = dis.readInt();
+        size = dis.readInt();
+        type = dis.readByte();
+    }
+
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public int compareTo(Location o) {
+        Location l = (Location)o;
+        if (dataFileId == l.dataFileId) {
+            int rc = offset - l.offset;
+            return rc;
+        }
+        return dataFileId - l.dataFileId;
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof Location) {
+            result = compareTo((Location)o) == 0;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return dataFileId ^ offset;
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/Location.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/NIODataFileAppender.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/NIODataFileAppender.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/NIODataFileAppender.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
+ * efficently copy data to files.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class NIODataFileAppender extends DataFileAppender {
+
+    public NIODataFileAppender(AsyncDataManager fileManager) {
+        super(fileManager);
+    }
+
+    /**
+     * The async processing loop that writes to the data files and does the
+     * force calls.
+     * 
+     * Since the file sync() call is the slowest of all the operations, this
+     * algorithm tries to 'batch' or group together several file sync() requests
+     * into a single file sync() call. The batching is accomplished attaching
+     * the same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    protected void processQueue() {
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        FileChannel channel = null;
+
+        try {
+
+            ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
+            ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
+            ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize);
+
+            // Populate the static parts of the headers and footers..
+            header.putInt(0); // size
+            header.put((byte)0); // type
+            header.put(RESERVED_SPACE); // reserved
+            header.put(AsyncDataManager.ITEM_HEAD_SOR);
+            footer.put(AsyncDataManager.ITEM_HEAD_EOR);
+
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        if (shutdown) {
+                            return;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile(true);
+                    channel = file.getChannel();
+                }
+
+                WriteCommand write = wb.first;
+
+                // Write all the data.
+                // Only need to seek to first location.. all others
+                // are in sequence.
+                file.seek(write.location.getOffset());
+
+                
+                boolean forceToDisk=false;
+                
+                // 
+                // is it just 1 big write?
+                if (wb.size == write.location.getSize()) {
+                    forceToDisk = write.sync | write.onComplete!=null;
+                    
+                    header.clear();
+                    header.putInt(write.location.getSize());
+                    header.put(write.location.getType());
+                    header.clear();
+                    transfer(header, channel);
+                    ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+                                                        write.data.getLength());
+                    transfer(source, channel);
+                    footer.clear();
+                    transfer(footer, channel);
+
+                } else {
+
+                    // Combine the smaller writes into 1 big buffer
+                    while (write != null) {
+                        forceToDisk |= write.sync | write.onComplete!=null;
+                        
+                        header.clear();
+                        header.putInt(write.location.getSize());
+                        header.put(write.location.getType());
+                        header.clear();
+                        copy(header, buffer);
+                        assert !header.hasRemaining();
+
+                        ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+                                                            write.data.getLength());
+                        copy(source, buffer);
+                        assert !source.hasRemaining();
+
+                        footer.clear();
+                        copy(footer, buffer);
+                        assert !footer.hasRemaining();
+
+                        write = (WriteCommand)write.getNext();
+                    }
+
+                    // Fully write out the buffer..
+                    buffer.flip();
+                    transfer(buffer, channel);
+                    buffer.clear();
+                }
+
+                if( forceToDisk ) {
+                    file.getChannel().force(false);
+                }
+
+                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                dataManager.setLastAppendLocation(lastWrite.location);
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.first;
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    if (write.onComplete != null) {
+						try {
+							write.onComplete.run();
+						} catch (Throwable e) {
+							e.printStackTrace();
+						}
+					}
+                    write = (WriteCommand)write.getNext();
+                }
+                
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
+            }
+
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (IOException e) {
+            }
+            shutdownDone.countDown();
+        }
+    }
+
+    /**
+     * Copy the bytes in header to the channel.
+     * 
+     * @param header - source of data
+     * @param channel - destination where the data will be written.
+     * @throws IOException
+     */
+    private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+        while (header.hasRemaining()) {
+            channel.write(header);
+        }
+    }
+
+    private int copy(ByteBuffer src, ByteBuffer dest) {
+        int rc = Math.min(dest.remaining(), src.remaining());
+        if (rc > 0) {
+            // Adjust our limit so that we don't overflow the dest buffer.
+            int limit = src.limit();
+            src.limit(src.position() + rc);
+            dest.put(src);
+            // restore the limit.
+            src.limit(limit);
+        }
+        return rc;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyAsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyAsyncDataManager.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyAsyncDataManager.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyAsyncDataManager.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.Scheduler;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ */
+public class ReadOnlyAsyncDataManager extends AsyncDataManager {
+    
+    private static final Log LOG = LogFactory.getLog(ReadOnlyAsyncDataManager.class);
+    private final ArrayList<File> dirs;
+
+    public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) {
+        this.dirs = dirs;
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
+        started = true;
+                
+        ArrayList<File> files = new ArrayList<File>();
+        for (File directory : dirs) {
+            final File d = directory;
+            File[] f = d.listFiles(new FilenameFilter() {
+                public boolean accept(File dir, String n) {
+                    return dir.equals(d) && n.startsWith(filePrefix);
+                }
+            });
+            for (int i = 0; i < f.length; i++) {
+                files.add(f[i]);
+            }
+        }
+       
+        for (File file : files) {
+            try {
+                String n = file.getName();
+                String numStr = n.substring(filePrefix.length(), n.length());
+                int num = Integer.parseInt(numStr);
+                DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+                fileMap.put(dataFile.getDataFileId(), dataFile);
+                storeSize.addAndGet(dataFile.getLength());
+            } catch (NumberFormatException e) {
+                // Ignore file that do not match the pattern.
+            }
+        }
+
+        // Sort the list so that we can link the DataFiles together in the
+        // right order.
+        List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
+        Collections.sort(dataFiles);
+        currentWriteFile = null;
+        for (DataFile df : dataFiles) {
+            if (currentWriteFile != null) {
+                currentWriteFile.linkAfter(df);
+            }
+            currentWriteFile = df;
+            fileByFileMap.put(df.getFile(), df);
+        }
+        
+        // Need to check the current Write File to see if there was a partial
+        // write to it.
+        if (currentWriteFile != null) {
+
+            // See if the lastSyncedLocation is valid..
+            Location l = lastAppendLocation.get();
+            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+                l = null;
+            }
+        }
+    }
+    
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        accessorPool.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        started = false;
+    }
+
+    
+    public Location getFirstLocation() throws IllegalStateException, IOException {
+        if( currentWriteFile == null ) {
+            return null;
+        }
+        
+        DataFile first = (DataFile)currentWriteFile.getHeadNode();
+        Location cur = new Location();
+        cur.setDataFileId(first.getDataFileId());
+        cur.setOffset(0);
+        cur.setSize(0);
+        return getNextLocation(cur);
+    }
+    
+    @Override
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+    }    
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyDataFile.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyDataFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ReadOnlyDataFile.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Allows you to open a data file in read only mode.  Useful when working with 
+ * archived data files.
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+    ReadOnlyDataFile(File file, int number, int preferedSize) {
+        super(file, number, preferedSize);
+    }
+    
+    
+    public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+        RandomAccessFile rc = new RandomAccessFile(file, "r");
+        // When we start to write files size them up so that the OS has a chance
+        // to allocate the file contigously.
+        if (appender) {
+            if (length < preferedSize) {
+                rc.setLength(preferedSize);
+            }
+        }
+        return rc;
+    }
+
+    public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/package.html?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/package.html Fri Jul 18 08:49:48 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ journal based data storage - scalable and fast
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/BaseContainerImpl.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/BaseContainerImpl.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/BaseContainerImpl.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.ContainerId;
+import org.apache.kahadb.RuntimeStoreException;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.DataManager;
+import org.apache.kahadb.impl.data.Item;
+import org.apache.kahadb.impl.index.DiskIndexLinkedList;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexLinkedList;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.impl.index.VMIndexLinkedList;
+
+/**
+ * Implementation of a ListContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public abstract class BaseContainerImpl {
+
+    private static final Log LOG = LogFactory.getLog(BaseContainerImpl.class);
+    protected IndexItem root;
+    protected IndexLinkedList indexList;
+    protected IndexManager indexManager;
+    protected DataManager dataManager;
+    protected ContainerId containerId;
+    protected boolean loaded;
+    protected boolean closed;
+    protected boolean initialized;
+    protected boolean persistentIndex;
+
+    protected BaseContainerImpl(ContainerId id, IndexItem root, IndexManager indexManager, DataManager dataManager, boolean persistentIndex) {
+        this.containerId = id;
+        this.root = root;
+        this.indexManager = indexManager;
+        this.dataManager = dataManager;
+        this.persistentIndex = persistentIndex;
+    }
+
+    public ContainerId getContainerId() {
+        return containerId;
+    }
+
+    public synchronized void init() {
+        if (!initialized) {
+            if (!initialized) {
+                initialized = true;
+                if (this.indexList == null) {
+                    if (persistentIndex) {
+                        this.indexList = new DiskIndexLinkedList(indexManager, root);
+                    } else {
+                        this.indexList = new VMIndexLinkedList(root);
+                    }
+                }
+            }
+        }
+    }
+
+    public synchronized void clear() {
+        if (indexList != null) {
+            indexList.clear();
+        }
+    }
+
+    /**
+     * @return the indexList
+     */
+    public IndexLinkedList getList() {
+        return indexList;
+    }
+
+    /**
+     * @param indexList the indexList to set
+     */
+    public void setList(IndexLinkedList indexList) {
+        this.indexList = indexList;
+    }
+
+    public abstract void unload();
+
+    public abstract void load();
+
+    public abstract int size();
+
+    protected abstract Object getValue(StoreEntry currentItem);
+
+    protected abstract void remove(IndexItem currentItem);
+
+    protected final synchronized IndexLinkedList getInternalList() {
+        return indexList;
+    }
+
+    public final synchronized void close() {
+        unload();
+        closed = true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#isLoaded()
+     */
+    public final synchronized boolean isLoaded() {
+        checkClosed();
+        return loaded;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#getId()
+     */
+    public final Object getId() {
+        checkClosed();
+        return containerId.getKey();
+    }
+
+    public DataManager getDataManager() {
+        return dataManager;
+    }
+
+    public IndexManager getIndexManager() {
+        return indexManager;
+    }
+
+    public final synchronized void expressDataInterest() throws IOException {
+        long nextItem = root.getNextItem();
+        while (nextItem != Item.POSITION_NOT_SET) {
+            IndexItem item = indexManager.getIndex(nextItem);
+            item.setOffset(nextItem);
+            dataManager.addInterestInFile(item.getKeyFile());
+            dataManager.addInterestInFile(item.getValueFile());
+            nextItem = item.getNextItem();
+        }
+    }
+
+    protected final void doClear() {
+        checkClosed();
+        loaded = true;
+        List<IndexItem> indexList = new ArrayList<IndexItem>();
+        try {
+            init();
+            long nextItem = root.getNextItem();
+            while (nextItem != Item.POSITION_NOT_SET) {
+                IndexItem item = new IndexItem();
+                item.setOffset(nextItem);
+                indexList.add(item);
+                nextItem = item.getNextItem();
+            }
+            root.setNextItem(Item.POSITION_NOT_SET);
+            storeIndex(root);
+            for (int i = 0; i < indexList.size(); i++) {
+                IndexItem item = indexList.get(i);
+                dataManager.removeInterestInFile(item.getKeyFile());
+                dataManager.removeInterestInFile(item.getValueFile());
+                indexManager.freeIndex(item);
+            }
+            indexList.clear();
+        } catch (IOException e) {
+            LOG.error("Failed to clear Container " + getId(), e);
+            throw new RuntimeStoreException(e);
+        }
+    }
+
+    protected final void delete(final IndexItem keyItem, final IndexItem prevItem, final IndexItem nextItem) {
+        if (keyItem != null) {
+            try {
+                root = indexList.getRoot();
+                IndexItem prev = prevItem == null ? root : prevItem;
+                IndexItem next = (nextItem == null || !nextItem.equals(root)) ? nextItem : null;
+                dataManager.removeInterestInFile(keyItem.getKeyFile());
+                dataManager.removeInterestInFile(keyItem.getValueFile());
+                if (next != null) {
+                    prev.setNextItem(next.getOffset());
+                    next.setPreviousItem(prev.getOffset());
+                    updateIndexes(next);
+                } else {
+                    prev.setNextItem(Item.POSITION_NOT_SET);
+                }
+                updateIndexes(prev);
+                indexManager.freeIndex(keyItem);
+            } catch (IOException e) {
+                LOG.error("Failed to delete " + keyItem, e);
+                throw new RuntimeStoreException(e);
+            }
+        }
+    }
+
+    protected final void checkClosed() {
+        if (closed) {
+            throw new RuntimeStoreException("The store is closed");
+        }
+    }
+
+    protected void storeIndex(IndexItem item) throws IOException {
+        indexManager.storeIndex(item);
+    }
+
+    protected void updateIndexes(IndexItem item) throws IOException {
+        indexManager.updateIndexes(item);
+    }
+
+    protected final boolean isRoot(StoreEntry item) {
+        return item != null && root != null && (root == item || root.getOffset() == item.getOffset());
+        // return item != null && indexRoot != null && indexRoot == item;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerCollectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerCollectionSupport.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerCollectionSupport.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerCollectionSupport.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+/**
+ * Base class for container collections
+ * 
+ * @version $Revision: 1.2 $
+ */
+class ContainerCollectionSupport {
+
+    protected MapContainerImpl container;
+
+    protected ContainerCollectionSupport(MapContainerImpl container) {
+        this.container = container;
+    }
+
+    public int size() {
+        return container.size();
+    }
+
+    public boolean isEmpty() {
+        return container.isEmpty();
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySet.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySet.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySet.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Set of Map.Entry objects for a container
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerEntrySet extends ContainerCollectionSupport implements Set {
+    ContainerEntrySet(MapContainerImpl container) {
+        super(container);
+    }
+
+    public boolean contains(Object o) {
+        return container.entrySet().contains(o);
+    }
+
+    public Iterator iterator() {
+        return new ContainerEntrySetIterator(container, buildEntrySet().iterator());
+    }
+
+    public Object[] toArray() {
+        return buildEntrySet().toArray();
+    }
+
+    public Object[] toArray(Object[] a) {
+        return buildEntrySet().toArray(a);
+    }
+
+    public boolean add(Object o) {
+        throw new UnsupportedOperationException("Cannot add here");
+    }
+
+    public boolean remove(Object o) {
+        boolean result = false;
+        if (buildEntrySet().remove(o)) {
+            ContainerMapEntry entry = (ContainerMapEntry)o;
+            container.remove(entry.getKey());
+        }
+        return result;
+    }
+
+    public boolean containsAll(Collection c) {
+        return buildEntrySet().containsAll(c);
+    }
+
+    public boolean addAll(Collection c) {
+        throw new UnsupportedOperationException("Cannot add here");
+    }
+
+    public boolean retainAll(Collection c) {
+        List<Object> tmpList = new ArrayList<Object>();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object o = i.next();
+            if (!contains(o)) {
+                tmpList.add(o);
+            }
+        }
+        boolean result = false;
+        for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
+            result |= remove(i.next());
+        }
+        return result;
+    }
+
+    public boolean removeAll(Collection c) {
+        boolean result = true;
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            if (!remove(i.next())) {
+                result = false;
+            }
+        }
+        return result;
+    }
+
+    public void clear() {
+        container.clear();
+    }
+
+    protected Set<ContainerMapEntry> buildEntrySet() {
+        Set<ContainerMapEntry> set = new HashSet<ContainerMapEntry>();
+        for (Iterator i = container.keySet().iterator(); i.hasNext();) {
+            ContainerMapEntry entry = new ContainerMapEntry(container, i.next());
+            set.add(entry);
+        }
+        return set;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySetIterator.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySetIterator.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerEntrySetIterator.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.Iterator;
+
+/**
+ * An Iterator for a container entry Set
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerEntrySetIterator implements Iterator {
+    private MapContainerImpl container;
+    private Iterator iter;
+    private ContainerMapEntry currentEntry;
+
+    ContainerEntrySetIterator(MapContainerImpl container, Iterator iter) {
+        this.container = container;
+        this.iter = iter;
+    }
+
+    public boolean hasNext() {
+        return iter.hasNext();
+    }
+
+    public Object next() {
+        currentEntry = (ContainerMapEntry)iter.next();
+        return currentEntry;
+    }
+
+    public void remove() {
+        if (currentEntry != null) {
+            container.remove(currentEntry.getKey());
+        }
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySet.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySet.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySet.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kahadb.impl.index.IndexItem;
+
+/**
+ * A Set of keys for the container
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerKeySet extends ContainerCollectionSupport implements Set {
+
+    ContainerKeySet(MapContainerImpl container) {
+        super(container);
+    }
+
+    public boolean contains(Object o) {
+        return container.containsKey(o);
+    }
+
+    public Iterator iterator() {
+        return new ContainerKeySetIterator(container);
+    }
+
+    public Object[] toArray() {
+        List<Object> list = new ArrayList<Object>();
+        IndexItem item = container.getInternalList().getRoot();
+        while ((item = container.getInternalList().getNextEntry(item)) != null) {
+            list.add(container.getKey(item));
+        }
+        return list.toArray();
+    }
+
+    public Object[] toArray(Object[] a) {
+        List<Object> list = new ArrayList<Object>();
+        IndexItem item = container.getInternalList().getRoot();
+        while ((item = container.getInternalList().getNextEntry(item)) != null) {
+            list.add(container.getKey(item));
+        }
+        return list.toArray(a);
+    }
+
+    public boolean add(Object o) {
+        throw new UnsupportedOperationException("Cannot add here");
+    }
+
+    public boolean remove(Object o) {
+        return container.remove(o) != null;
+    }
+
+    public boolean containsAll(Collection c) {
+        for (Object key : c) {
+            if (!container.containsKey(key)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public boolean addAll(Collection c) {
+        throw new UnsupportedOperationException("Cannot add here");
+    }
+
+    public boolean retainAll(Collection c) {
+        List<Object> tmpList = new ArrayList<Object>();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object o = i.next();
+            if (!contains(o)) {
+                tmpList.add(o);
+            }
+        }
+        for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
+            remove(i.next());
+        }
+        return !tmpList.isEmpty();
+    }
+
+    public boolean removeAll(Collection c) {
+        boolean result = true;
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            if (!remove(i.next())) {
+                result = false;
+            }
+        }
+        return result;
+    }
+
+    public void clear() {
+        container.clear();
+    }
+
+    public String toString() {
+        StringBuffer result = new StringBuffer(32);
+        result.append("ContainerKeySet[");
+        IndexItem item = container.getInternalList().getRoot();
+        while ((item = container.getInternalList().getNextEntry(item)) != null) {
+            result.append(container.getKey(item));
+            result.append(",");
+        }
+        result.append("]");
+        return result.toString();
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySetIterator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySetIterator.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySetIterator.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerKeySetIterator.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.Iterator;
+
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexLinkedList;
+
+/**
+ * Iterator for the set of keys for a container
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerKeySetIterator implements Iterator {
+    
+    protected IndexItem nextItem;
+    protected IndexItem currentItem;
+
+    private MapContainerImpl container;
+    private IndexLinkedList list;
+
+    ContainerKeySetIterator(MapContainerImpl container) {
+        this.container = container;
+        this.list = container.getInternalList();
+        this.currentItem = list.getRoot();
+        this.nextItem = list.getNextEntry(currentItem);
+    }
+
+    public boolean hasNext() {
+        return nextItem != null;
+    }
+
+    public Object next() {
+        currentItem = nextItem;
+        Object result = container.getKey(nextItem);
+        nextItem = list.getNextEntry(nextItem);
+        return result;
+    }
+
+    public void remove() {
+        if (currentItem != null) {
+            container.remove(currentItem);
+            if (nextItem != null) {
+                list.refreshEntry(nextItem);
+            }
+        }
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerListIterator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerListIterator.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerListIterator.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerListIterator.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.ListIterator;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexLinkedList;
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator {
+
+    protected ContainerListIterator(ListContainerImpl container, IndexLinkedList list, IndexItem start) {
+        super(container, list, start);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#hasPrevious()
+     */
+    public boolean hasPrevious() {
+        synchronized (container) {
+            nextItem = (IndexItem)list.refreshEntry(nextItem);
+            return list.getPrevEntry(nextItem) != null;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#previous()
+     */
+    public Object previous() {
+        synchronized (container) {
+            nextItem = (IndexItem)list.refreshEntry(nextItem);
+            nextItem = list.getPrevEntry(nextItem);
+            return nextItem != null ? container.getValue(nextItem) : null;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#nextIndex()
+     */
+    public int nextIndex() {
+        int result = -1;
+        if (nextItem != null) {
+            synchronized (container) {
+                nextItem = (IndexItem)list.refreshEntry(nextItem);
+                StoreEntry next = list.getNextEntry(nextItem);
+                if (next != null) {
+                    result = container.getInternalList().indexOf(next);
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#previousIndex()
+     */
+    public int previousIndex() {
+        int result = -1;
+        if (nextItem != null) {
+            synchronized (container) {
+                nextItem = (IndexItem)list.refreshEntry(nextItem);
+                StoreEntry prev = list.getPrevEntry(nextItem);
+                if (prev != null) {
+                    result = container.getInternalList().indexOf(prev);
+                }
+            }
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#set(E)
+     */
+    public void set(Object o) {
+        IndexItem item = ((ListContainerImpl)container).internalSet(previousIndex() + 1, o);
+        nextItem = item;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#add(E)
+     */
+    public void add(Object o) {
+        IndexItem item = ((ListContainerImpl)container).internalAdd(previousIndex() + 1, o);
+        nextItem = item;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerMapEntry.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerMapEntry.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerMapEntry.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.Map;
+
+import org.apache.kahadb.MapContainer;
+
+/**
+ * Map.Entry implementation for a container
+ * 
+ * @version $Revision: 1.2 $
+ */
+class ContainerMapEntry implements Map.Entry {
+
+    private MapContainer container;
+    private Object key;
+
+    ContainerMapEntry(MapContainer container, Object key) {
+        this.container = container;
+        this.key = key;
+
+    }
+
+    public Object getKey() {
+        return key;
+    }
+
+    public Object getValue() {
+        return container.get(key);
+    }
+
+    public Object setValue(Object value) {
+        return container.put(key, value);
+    }
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerMapEntry.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollection.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollection.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollection.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexLinkedList;
+
+/**
+ * Values collection for the MapContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+class ContainerValueCollection extends ContainerCollectionSupport implements Collection {
+
+    ContainerValueCollection(MapContainerImpl container) {
+        super(container);
+    }
+
+    public boolean contains(Object o) {
+        return container.containsValue(o);
+    }
+
+    public Iterator iterator() {
+        IndexLinkedList list = container.getItemList();
+        return new ContainerValueCollectionIterator(container, list, list.getRoot());
+    }
+
+    public Object[] toArray() {
+        Object[] result = null;
+        IndexLinkedList list = container.getItemList();
+        synchronized (list) {
+            result = new Object[list.size()];
+            IndexItem item = list.getFirst();
+            int count = 0;
+            while (item != null) {
+                Object value = container.getValue(item);
+                result[count++] = value;
+
+                item = list.getNextEntry(item);
+            }
+
+        }
+        return result;
+    }
+
+    public Object[] toArray(Object[] result) {
+        IndexLinkedList list = container.getItemList();
+        synchronized (list) {
+            if (result.length <= list.size()) {
+                IndexItem item = list.getFirst();
+                int count = 0;
+                while (item != null) {
+                    Object value = container.getValue(item);
+                    result[count++] = value;
+
+                    item = list.getNextEntry(item);
+                }
+            }
+        }
+        return result;
+    }
+
+    public boolean add(Object o) {
+        throw new UnsupportedOperationException("Can't add an object here");
+    }
+
+    public boolean remove(Object o) {
+        return container.removeValue(o);
+    }
+
+    public boolean containsAll(Collection c) {
+        boolean result = !c.isEmpty();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            if (!contains(i.next())) {
+                result = false;
+                break;
+            }
+        }
+        return result;
+    }
+
+    public boolean addAll(Collection c) {
+        throw new UnsupportedOperationException("Can't add everything here!");
+    }
+
+    public boolean removeAll(Collection c) {
+        boolean result = true;
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object obj = i.next();
+            result &= remove(obj);
+        }
+        return result;
+    }
+
+    public boolean retainAll(Collection c) {
+        List<Object> tmpList = new ArrayList<Object>();
+        for (Iterator i = c.iterator(); i.hasNext();) {
+            Object o = i.next();
+            if (!contains(o)) {
+                tmpList.add(o);
+            }
+        }
+        for (Iterator<Object> i = tmpList.iterator(); i.hasNext();) {
+            remove(i.next());
+        }
+        return !tmpList.isEmpty();
+    }
+
+    public void clear() {
+        container.clear();
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollectionIterator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollectionIterator.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollectionIterator.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/ContainerValueCollectionIterator.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.container;
+
+import java.util.Iterator;
+
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexLinkedList;
+
+/**
+ * Values collection iterator for the MapContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class ContainerValueCollectionIterator implements Iterator {
+
+    protected BaseContainerImpl container;
+    protected IndexLinkedList list;
+    protected IndexItem nextItem;
+    protected IndexItem currentItem;
+
+    ContainerValueCollectionIterator(BaseContainerImpl container, IndexLinkedList list, IndexItem start) {
+        this.container = container;
+        this.list = list;
+        this.currentItem = start;
+        this.nextItem = list.getNextEntry((IndexItem)list.refreshEntry(start));
+    }
+
+    public boolean hasNext() {
+        return nextItem != null;
+    }
+
+    public Object next() {
+        synchronized (container) {
+            nextItem = (IndexItem)list.refreshEntry(nextItem);
+            currentItem = nextItem;
+            Object result = container.getValue(nextItem);
+            nextItem = list.getNextEntry(nextItem);
+            return result;
+        }
+    }
+
+    public void remove() {
+        synchronized (container) {
+            if (currentItem != null) {
+                currentItem = (IndexItem)list.refreshEntry(currentItem);
+                container.remove(currentItem);
+            }
+        }
+    }
+}



Mime
View raw message