Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 43996 invoked from network); 22 Aug 2008 01:16:29 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 22 Aug 2008 01:16:29 -0000 Received: (qmail 13111 invoked by uid 500); 22 Aug 2008 01:16:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13082 invoked by uid 500); 22 Aug 2008 01:16:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13070 invoked by uid 99); 22 Aug 2008 01:16:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2008 18:16:27 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Aug 2008 01:15:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 259D3238899E; Thu, 21 Aug 2008 18:16:07 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r687919 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/util/ test/java/org/apache/kahadb/page/ Date: Fri, 22 Aug 2008 01:16:06 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080822011607.259D3238899E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Aug 21 18:16:06 2008 New Revision: 687919 URL: http://svn.apache.org/viewvc?rev=687919&view=rev Log: Added a HashIndex implementation that uses the PageFile layer. Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java?rev=687919&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java Thu Aug 21 18:16:06 2008 @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kahadb.page; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.LinkedList; + +import org.apache.kahadb.Marshaller; +import org.apache.kahadb.util.ByteArrayOutputStream; +import org.apache.kahadb.util.ByteSequence; + +/** + * Represents a Chunk of data in a chunk stream. Use the PageInputStream and PageOnputStream classes to access + * a linked set of chunks on a PageFile. + * + */ +public class Chunk extends ByteSequence { + + static final int HEADER_MAX_SIZE=9; + + boolean last; + long next; + + public Chunk() { + super(); + } + + public Chunk(byte[] data, int offset, int length) { + super(data, offset, length); + } + + public Chunk(byte[] data) { + super(data); + } + + @Override + public String toString() { + return "Chunk{length: "+length+", last; "+last+", next:"+next+"}"; + } + + public static class ChunkMarshaller implements Marshaller { + private final int chunkSize; + + public ChunkMarshaller(int chunkSize) { + this.chunkSize = chunkSize; + } + + public Class getType() { + return Chunk.class; + } + + public void writePayload(Chunk chunk, DataOutput out) throws IOException { + if( chunk.last ) { + out.writeBoolean(true); + out.writeInt(chunk.length); + out.write(chunk.data, chunk.offset, chunk.length); + } else { + out.writeBoolean(false); + out.writeLong(chunk.next); + out.write(chunk.data, chunk.offset, chunk.length); + } + } + + public Chunk readPayload(DataInput in) throws IOException { + Chunk chunk = new Chunk(); + if( in.readBoolean() ) { + chunk.last=true; + chunk.length = in.readInt(); + chunk.data = new byte[chunk.length]; + chunk.next=0; + in.readFully(chunk.data); + } else { + chunk.last=false; + chunk.next = in.readLong(); + chunk.length = chunkSize; + chunk.data = new byte[chunkSize]; + in.readFully(chunk.data); + } + return chunk; + } + + public int getChunkSize() { + return chunkSize; + } + + } + + public static class PageInputStream extends InputStream { + + private PageFile file; + private Chunk chunk; + private int pos; + private int pageCount; + + private int markPos; + private Chunk markChunk; + private int markPageCount; + private ChunkMarshaller marshaller; + + public PageInputStream(PageFile file, long pageId) throws IOException { + this.file = file; + this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE); + + Page page = file.load(pageId, marshaller); + if( page.getType() != Page.CHUNK_TYPE ) { + throw new EOFException("Chunk stream does not exist at page: "+pageId); + } + chunk = (Chunk)page.getData(); + pageCount++; + + } + + public int read() throws IOException { + if (!atEOF()) { + return chunk.data[chunk.offset+pos++] & 0xff; + } else { + return -1; + } + } + + private boolean atEOF() throws IOException { + if( pos < chunk.length ) { + return false; + } + if( chunk.last ) { + return true; + } + fill(); + return pos >= chunk.length; + } + + private void fill() throws IOException { + Page page = file.load(chunk.next, marshaller); + if( page.getType() == Page.INVALID_TYPE ) { + throw new IOException("Invalid page: "+chunk.next); + } + chunk = (Chunk)page.getData(); + pageCount++; + pos = 0; + } + + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public int read(byte b[], int off, int len) throws IOException { + if (!atEOF()) { + int rc=0; + while(!atEOF() && rc < len) { + len = Math.min(len, chunk.length - pos); + if (len > 0) { + System.arraycopy(chunk.data, chunk.offset+pos, b, off, len); + pos += len; + } + rc+=len; + } + return rc; + } else { + return -1; + } + } + + public long skip(long len) throws IOException { + if (atEOF()) { + int rc=0; + while(!atEOF() && rc < len) { + len = Math.min(len, chunk.length - pos); + if (len > 0) { + pos += len; + } + rc+=len; + } + return rc; + } else { + return -1; + } + } + + public int available() { + return chunk.length - pos; + } + + public boolean markSupported() { + return true; + } + + public void mark(int markpos) { + markPos = pos; + markChunk = chunk; + markPageCount = pageCount; + } + + public void reset() { + pos = markPos; + chunk = markChunk; + pageCount = markPageCount; + } + + public int getPageCount() { + return pageCount; + } + + } + + static public class PageOutputStream extends ByteArrayOutputStream { + + private PageFile file; + private long pageId; + private ChunkMarshaller marshaller; + private int pageCount; + private ArrayList pages; + + public PageOutputStream(PageFile file, long pageId) { + this.file = file; + this.pageId = pageId; + this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE); + } + + @Override + public void close() throws IOException { + super.close(); + + ArrayList chunks = new ArrayList(); + ByteSequence bs = toByteSequence(); + + int pos = 0; + while( pos < bs.length ) { + int len = Math.min(marshaller.getChunkSize(), bs.length - pos); + Chunk c = new Chunk(bs.data, pos, len); + chunks.add(c); + pos+=len; + } + if( chunks.isEmpty() ) { + Chunk c = new Chunk(new byte[]{}); + chunks.add(c); + } + chunks.get(chunks.size()-1).last = true; + + // Load the old pages.. + pages = new ArrayList(); + long p = pageId; + while( p >= 0 ) { + Page page = file.load(p, marshaller); + Chunk c = (Chunk)page.getData(); + if( c!=null && !c.last ) { + p = c.next; + } else { + p = -1; + } + pages.add(page); + } + + // Add more if needed. + while( pages.size() < chunks.size() ) { + pages.add(file.allocate()); + } + + // Update the page data. + for(int i=0; i < chunks.size(); i++) { + Chunk chunk = chunks.get(i); + Page page = pages.get(i); + page.setType(Page.CHUNK_TYPE); + page.setData(chunk); + if( !chunk.last ) { + chunk.next = pages.get(i+1).getPageId(); + } + } + + // If there were extra pages.. Free them up. + for(int i=chunks.size(); i < pages.size(); i++) { + Page page = pages.get(i); + page.setData(null); + page.setType(Page.FREE_TYPE); + } + + file.write(pages, marshaller); + + pageCount=chunks.size(); + } + + public int getPageCount() { + return pageCount; + } + + } + +} \ No newline at end of file Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java?rev=687919&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java Thu Aug 21 18:16:06 2008 @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.page; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.kahadb.page.Chunk.PageInputStream; +import org.apache.kahadb.page.Chunk.PageOutputStream; + +/** + * Bin in a HashIndex + * + * @version $Revision: 1.1.1.1 $ + */ +class HashBin { + + private final HashIndex index; + private final long pageId; + private TreeMap data; + private int pageCount; + + /** + * Constructor + * + * @param hashIndex + * @param pageId + * @param maximumEntries + * @throws IOException + */ + HashBin(HashIndex hashIndex, long pageId) throws IOException { + this.index = hashIndex; + this.pageId = pageId; + } + + private void load() throws IOException { + + data = new TreeMap(); + + // Using page page streams to store the data makes it easy to marshall the HashBin data, + // but it does not give us very good page based caching. As even if the pages are cached, + // we will still need to de-marshall from the stream. + + // I think it will be better to make the bin a btree root. + + PageInputStream pis = new PageInputStream(index.getPageFile(), pageId); + DataInputStream is = new DataInputStream(pis); + try { + + int size = is.readInt(); + for(int i=0; i < size; i++) { + Comparable key = (Comparable)index.getKeyMarshaller().readPayload(is); + long value = is.readLong(); + data.put(key, value); + } + is.close(); + pageCount = pis.getPageCount(); + } catch (IOException e) { + throw e; + } + } + + public void store() throws IOException { + PageOutputStream pos = new PageOutputStream(index.getPageFile(), pageId); + DataOutputStream os = new DataOutputStream(pos); + if( data == null ) { + os.writeInt(0); + } else { + os.writeInt(data.size()); + for (Map.Entry entry : data.entrySet()) { + index.getKeyMarshaller().writePayload(entry.getKey(), os); + os.writeLong(entry.getValue()); + } + } + os.close(); + pageCount = pos.getPageCount(); + } + + public int size() throws IOException { + if( data!=null ) { + return data.size(); + } else { + + // Peek at the page to see how many items it contains. + PageInputStream pis = new PageInputStream(index.getPageFile(), pageId); + DataInputStream is = new DataInputStream(pis); + int size = is.readInt(); + is.close(); + + return size; + } + } + + public Long put(Comparable key, Long value) throws IOException { + if( data==null ) { + load(); + } + Long rc = data.put(key, value); + if( !value.equals(rc) ) { + store(); + } + return rc; + } + + public Long find(Comparable key) throws IOException { + if( data==null ) { + load(); + } + return data.get(key); + } + + public Map getAll() throws IOException { + if( data==null ) { + load(); + } + return data; + } + + public Long remove(Comparable key) throws IOException { + if( data==null ) { + load(); + } + Long rc = data.remove(key); + if( rc!=null ) { + store(); + } + return rc; + } + + public long getPageId() { + return pageId; + } + +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java?rev=687919&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java Thu Aug 21 18:16:06 2008 @@ -0,0 +1,456 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.page; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.Marshaller; +import org.apache.kahadb.StoreEntry; +import org.apache.kahadb.impl.index.Index; +import org.apache.kahadb.impl.index.IndexManager; +import org.apache.kahadb.page.Chunk.PageInputStream; +import org.apache.kahadb.page.Chunk.PageOutputStream; + +/** + * BTree implementation + * + * @version $Revision: 1.1.1.1 $ + */ +public class HashIndex implements Index { + + public static final int CLOSED_STATE = 1; + public static final int OPEN_STATE = 2; + public static final int INITIALIZING_STATE = 3; + + public static final int RESIZING_PHASE1_STATE = 4; + public static final int RESIZING_PHASE2_STATE = 5; + + private static final Log LOG = LogFactory.getLog(HashIndex.class); + + public static final int DEFAULT_BIN_CAPACITY; + public static final int DEFAULT_MAXIMUM_BIN_CAPACITY; + public static final int DEFAULT_MINIMUM_BIN_CAPACITY; + public static final int DEFAULT_LOAD_FACTOR; + + static { + DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024")); + DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384")); + DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16")); + DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75")); + } + + private IndexManager indexManager; + + private Marshaller keyMarshaller; + private AtomicBoolean loaded = new AtomicBoolean(); + + private int size; + + private int increaseThreshold; + private int decreaseThreshold; + + // Where the bin page array starts at. + private long binPageId; + private int binCapacity = DEFAULT_BIN_CAPACITY; + private int binsActive; + private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY; + private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY; + + // While resizing, the following contains the new resize data. + private int resizeCapacity; + private long resizePageId; + + // When the index is initializing or resizing.. state changes so that + // on failure it can be properly recovered. + private int state; + + // Once binsActive/binCapacity reaches the loadFactor, then we need to + // increase the capacity + private int loadFactor = DEFAULT_LOAD_FACTOR; + + private PageFile pageFile; + // This page holds the index metadata. + private long pageId; + + /** + * Constructor + * + * @param directory + * @param name + * @param indexManager + * @param numberOfBins + * @throws IOException + */ + public HashIndex(IndexManager indexManager, PageFile pageFile, long pageId) throws IOException { + this.pageFile = pageFile; + this.indexManager = indexManager; + this.pageId = pageId; + } + + public synchronized void load() { + if (loaded.compareAndSet(false, true)) { + try { + Page page = pageFile.load(pageId, null); + + // Is this a brand new index? + if (page.getType() == Page.FREE_TYPE) { + + // We need to create the pages for the bins + Page binPage = pageFile.allocate(binCapacity); + binPageId = binPage.getPageId(); + state = INITIALIZING_STATE; + storeMetadata(); + pageFile.checkpoint(); + + // If failure happens now we can continue initializing the + // the hash bins... + + } else { + // Lets load it's data + loadMetadata(); + + // If we did not have a clean shutdown... + if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) { + // Figure out the size and the # of bins that are + // active. Yeah This loads the first page of every bin. :( + // We might want to put this in the metadata page, but + // then that page would be getting updated on every write. + size = 0; + for (int i = 0; i < binCapacity; i++) { + HashBin hashBin = new HashBin(this, binPageId + i); + int t = hashBin.size(); + if (t > 0) { + binsActive++; + } + size += t; + } + } + } + + if (state == INITIALIZING_STATE) { + // TODO: + // If a failure occurs mid way through us initializing the + // bins.. will the page file still think we have the rest + // of them previously allocated to us? + + for (int i = 0; i < binCapacity; i++) { + HashBin hashBin = new HashBin(this, binPageId + i); + hashBin.store(); + } + size = 0; + binsActive = 0; + } + + if (state == RESIZING_PHASE1_STATE) { + // continue resize phase 1 + resizePhase1(); + } + if (state == RESIZING_PHASE2_STATE) { + // continue resize phase 1 + resizePhase2(); + } + + calcThresholds(); + + state = OPEN_STATE; + storeMetadata(); + pageFile.checkpoint(); + + LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public synchronized void unload() throws IOException { + if (loaded.compareAndSet(true, false)) { + state = CLOSED_STATE; + storeMetadata(); + } + } + + public synchronized StoreEntry get(Object key) throws IOException { + load(); + Long result = getBin(key).find((Comparable)key); + return result != null ? indexManager.getIndex(result) : null; + } + + public synchronized void store(Object key, StoreEntry value) throws IOException { + load(); + HashBin bin = getBin(key); + if (bin.put((Comparable)key, value.getOffset()) == null) { + this.size++; + if (bin.size() == 1) { + binsActive++; + } + } + if (this.binsActive >= this.increaseThreshold) { + int newSize = Math.min(maximumBinCapacity, binCapacity*2); + if(binCapacity!=newSize) { + resize(newSize); + } + } + } + + public synchronized StoreEntry remove(Object key) throws IOException { + load(); + StoreEntry result = null; + + HashBin bin = getBin(key); + Long offset = bin.remove((Comparable)key); + if (offset != null) { + this.size--; + if (bin.size() == 0) { + binsActive--; + } + result = this.indexManager.getIndex(offset); + } + + if (this.binsActive <= this.decreaseThreshold) { + int newSize = Math.max(minimumBinCapacity, binCapacity/2); + if(binCapacity!=newSize) { + resize(newSize); + + } + } + return result; + } + + public synchronized boolean containsKey(Object key) throws IOException { + return get(key) != null; + } + + public synchronized void clear() throws IOException { + load(); + for (int i = 0; i < binCapacity; i++) { + HashBin hashBin = new HashBin(this, binPageId + i); + hashBin.store(); // A store before a load.. clears the data out. + } + size = 0; + binsActive = 0; + } + + public String toString() { + String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile; + return str; + } + + // ///////////////////////////////////////////////////////////////// + // Implementation Methods + // ///////////////////////////////////////////////////////////////// + + private void loadMetadata() throws IOException { + PageInputStream pis = new PageInputStream(pageFile, pageId); + DataInputStream is = new DataInputStream(pis); + state = is.readInt(); + binPageId = is.readLong(); + binCapacity = is.readInt(); + size = is.readInt(); + binsActive = is.readInt(); + resizePageId = is.readLong(); + resizeCapacity = is.readInt(); + is.close(); + } + + private void storeMetadata() throws IOException { + PageOutputStream pos = new PageOutputStream(pageFile, pageId); + DataOutputStream os = new DataOutputStream(pos); + os.writeInt(state); + os.writeLong(binPageId); + os.writeInt(binCapacity); + os.writeInt(size); + os.writeInt(binsActive); + os.writeLong(resizePageId); + os.writeInt(resizeCapacity); + os.close(); + } + + private void resize(int newSize) throws IOException { + + LOG.debug("Resizing to: "+newSize); + + state = RESIZING_PHASE1_STATE; + resizeCapacity = newSize; + resizePageId = pageFile.allocate(resizeCapacity).getPageId(); + storeMetadata(); + pageFile.checkpoint(); + resizePhase1(); + resizePhase2(); + } + + private void resizePhase1() throws IOException { + // In Phase 1 we copy the data to the new bins.. + + // Initialize the bins.. + for (int i = 0; i < resizeCapacity; i++) { + HashBin bin = new HashBin(this, resizePageId + i); + bin.store(); + } + + binsActive = 0; + // Copy the data from the old bins to the new bins. + for (int i = 0; i < binCapacity; i++) { + HashBin bin = new HashBin(this, binPageId + i); + for (Map.Entry entry : bin.getAll().entrySet()) { + HashBin resizeBin = getResizeBin(entry.getKey()); + resizeBin.put(entry.getKey(), entry.getValue()); + if( resizeBin.size() == 1) { + binsActive++; + } + } + } + + // Now we can release the old data. + state = RESIZING_PHASE2_STATE; + storeMetadata(); + pageFile.checkpoint(); + } + + private void resizePhase2() throws IOException { + for (int i = 0; i < binCapacity; i++) { + HashBin hashBin = new HashBin(this, binPageId + i); + hashBin.store(); // A store before a load.. clears the data out. + } + pageFile.free(binPageId, binCapacity); + + binCapacity = resizeCapacity; + binPageId = resizePageId; + resizeCapacity=0; + resizePageId=0; + state = OPEN_STATE; + storeMetadata(); + pageFile.checkpoint(); + calcThresholds(); + + LOG.debug("Resizing done. New bins start at: "+binPageId); + + } + + private void calcThresholds() { + increaseThreshold = (binCapacity * loadFactor)/100; + decreaseThreshold = (binCapacity * loadFactor * loadFactor ) / 20000; + } + + private HashBin getResizeBin(Object key) throws IOException { + int i = indexFor(key, resizeCapacity); + return new HashBin(this, resizePageId + i); + } + + private HashBin getBin(Object key) throws IOException { + int i = indexFor(key, binCapacity); + return new HashBin(this, binPageId + i); + } + + static int indexFor(Object x, int length) { + return Math.abs(x.hashCode()%length); + } + + // ///////////////////////////////////////////////////////////////// + // Property Accessors + // ///////////////////////////////////////////////////////////////// + + public Marshaller getKeyMarshaller() { + return keyMarshaller; + } + + /** + * Set the marshaller for key objects + * + * @param marshaller + */ + public synchronized void setKeyMarshaller(Marshaller marshaller) { + this.keyMarshaller = marshaller; + } + + /** + * @return number of bins in the index + */ + public int getBinCapacity() { + return this.binCapacity; + } + + /** + * @param binCapacity + */ + public void setBinCapacity(int binCapacity) { + if (loaded.get() && binCapacity != this.binCapacity) { + throw new RuntimeException("Pages already loaded - can't reset bin capacity"); + } + this.binCapacity = binCapacity; + } + + public boolean isTransient() { + return false; + } + + /** + * @return the loadFactor + */ + public int getLoadFactor() { + return loadFactor; + } + + /** + * @param loadFactor the loadFactor to set + */ + public void setLoadFactor(int loadFactor) { + this.loadFactor = loadFactor; + } + + /** + * @return the maximumCapacity + */ + public int setMaximumBinCapacity() { + return maximumBinCapacity; + } + + /** + * @param maximumCapacity the maximumCapacity to set + */ + public void setMaximumBinCapacity(int maximumCapacity) { + this.maximumBinCapacity = maximumCapacity; + } + + public synchronized int getSize() { + return size; + } + + public synchronized int getActiveBins() { + return binsActive; + } + + public long getBinPageId() { + return binPageId; + } + + public PageFile getPageFile() { + return pageFile; + } + + public int getBinsActive() { + return binsActive; + } + +} Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=687919&r1=687918&r2=687919&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Thu Aug 21 18:16:06 2008 @@ -39,6 +39,7 @@ public static final short INVALID_TYPE = -1; public static final short FREE_TYPE = 0; + public static final short CHUNK_TYPE = 1; private long pageId; @@ -54,12 +55,18 @@ this.type = other.type; this.data = other.data; } + + Page copy() { + Page rc = new Page(); + rc.copy(this); + return rc; + } void write(DataOutput os, Marshaller marshaller) throws IOException { os.writeShort(type); os.writeLong(txId); - if( marshaller!=null ) { + if( marshaller!=null && type!=FREE_TYPE ) { marshaller.writePayload(data, os); } } @@ -67,8 +74,10 @@ void read(DataInput is, Marshaller marshaller) throws IOException { type = is.readShort(); txId = is.readLong(); - if( marshaller!=null ) { + if( marshaller!=null && type!=FREE_TYPE ) { data = marshaller.readPayload(is); + } else { + data = null; } } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=687919&r1=687918&r2=687919&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Aug 21 18:16:06 2008 @@ -16,8 +16,6 @@ */ package org.apache.kahadb.page; -import com.sun.tools.javac.tree.Tree.TopLevel; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -30,12 +28,13 @@ import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.Map; import java.util.NoSuchElementException; import java.util.Properties; +import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,10 +43,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kahadb.Marshaller; -import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.page.Chunk.ChunkMarshaller; import org.apache.kahadb.util.DataByteArrayInputStream; import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.IOExceptionSupport; import org.apache.kahadb.util.IOHelper; import org.apache.kahadb.util.IntrospectionSupport; import org.apache.kahadb.util.LRUCache; @@ -93,12 +91,14 @@ private AtomicBoolean loaded = new AtomicBoolean(); private LRUCache pageCache; - private boolean enableRecoveryBuffer=true; - private boolean enableSyncedWrites=true; - private boolean enablePageCaching=false;//this is off by default - see AMQ-1667 + private boolean enableRecoveryBuffer=false; + private boolean enableSyncedWrites=false; + private boolean enablePageCaching=true; + private boolean enableAsyncWrites=false; + private int pageCacheSize = 10; - private LinkedHashMap writes=new LinkedHashMap(); + private TreeMap writes=new TreeMap(); private Thread writerThread; AtomicBoolean stopWriter = new AtomicBoolean(); private CountDownLatch checkpointLatch; @@ -168,16 +168,16 @@ * Internally used by the double write buffer implementation used in this class. */ private class PageWrite { - final Page page; + Page page; byte[] current; byte[] diskBound; public PageWrite(Page page, byte[] data) { - this.page = page; - this.current = data; + setCurrent(page, data); } - public void setCurrent(byte[] data) { + public void setCurrent(Page page, byte[] data) { + this.page=page; current=data; } @@ -193,6 +193,11 @@ diskBound=null; return current == null; } + + @Override + public String toString() { + return "PageWrite{pageId="+page.getPageId()+"}"; + } } @@ -387,22 +392,23 @@ if( !loaded.get() ) { throw new IllegalStateException("Cannot allocate a page when the page file is not loaded"); } - + Page page = null; - if(!freeList.isEmpty()) { - long pageId = freeList.removeFirst(); - page = new Page(); - page.setPageId(pageId); - page.setType(Page.FREE_TYPE); - } else { - // allocate one + + // We may need to create a new free page... + if(freeList.isEmpty()) { page = new Page(); page.setPageId(nextFreePageId); page.setType(Page.FREE_TYPE); nextFreePageId ++; +// LOG.debug("allocated: "+page.getPageId()); write(page, null); } - addToCache(page); + + long pageId = freeList.removeFirst(); + page = new Page(); + page.setPageId(pageId); + page.setType(Page.FREE_TYPE); return page; } @@ -423,28 +429,30 @@ Page page = null; Sequence seq = freeList.removeFirstSequence(count); - if(seq!=null) { - page = new Page(); - page.setPageId(seq.getFirst()); - page.setType(Page.FREE_TYPE); - } else { + if(seq==null) { - // allocate the pages.. - Page t = new Page(); - while( count > 0 ) { - t.setPageId(nextFreePageId); - t.setType(Page.FREE_TYPE); + // We may need to create a new free page... + page = new Page(); + int c=count; + while( c > 0 ) { + page.setPageId(nextFreePageId); + page.setType(Page.FREE_TYPE); nextFreePageId ++; - write(t, null); - count--; - +// LOG.debug("allocate writing: "+page.getPageId()); + write(page, null); + c--; if( page == null ) { - page = t; + page = page; } } + seq = freeList.removeFirstSequence(count); } - addToCache(page); + + page = new Page(); + page.setPageId(seq.getFirst()); + page.setType(Page.FREE_TYPE); +// LOG.debug("allocated: "+page.getPageId()); return page; } @@ -458,13 +466,63 @@ * if the PageFile is not loaded */ public void free(Page page) throws IOException { + page.setType(Page.FREE_TYPE); + free(page.getPageId(), 1); + } + + /** + * Frees up a previously allocated page so that it can be re-allocated again. + * + * @param page the page to free up + * @throws IOException + * If an disk error occurred. + * @throws IllegalStateException + * if the PageFile is not loaded + */ + public void free(long pageId) throws IOException { + free(pageId, 1); + } + + /** + * Frees up a previously allocated sequence of pages so that it can be re-allocated again. + * + * @param page the initial page of the sequence that will be getting freed + * @param count the number of pages in the sequence + * + * @throws IOException + * If an disk error occurred. + * @throws IllegalStateException + * if the PageFile is not loaded + */ + public void free(Page page, int count) throws IOException { + page.setType(Page.FREE_TYPE); + free(page.getPageId(), count); + } + + /** + * Frees up a previously allocated sequence of pages so that it can be re-allocated again. + * + * @param page the initial page of the sequence that will be getting freed + * @param count the number of pages in the sequence + * + * @throws IOException + * If an disk error occurred. + * @throws IllegalStateException + * if the PageFile is not loaded + */ + public void free(long pageId, int count) throws IOException { if( !loaded.get() ) { throw new IllegalStateException("Cannot free a page when the page file is not loaded"); } - removeFromCache(page); - page.setType(Page.FREE_TYPE); - write(page, null); - freeList.add(page.getPageId()); + + Page page = new Page(); + long initialId = pageId; + for (int i = 0; i < count; i++) { + page.setPageId(initialId+i); + page.setType(Page.FREE_TYPE); +// LOG.debug("free: "+page.getPageId()); + write(page, null); + } } /** @@ -506,7 +564,7 @@ // Can't load invalid offsets... if (page.getPageId() < 0) { - page.setTxId(Page.INVALID_TYPE); + page.setType(Page.INVALID_TYPE); return; } @@ -523,6 +581,7 @@ dataIn.restart(readBuffer); // Unmarshall it. +// LOG.debug("load: "+page.getPageId()); page.read(dataIn, marshaller); // Cache it. @@ -620,6 +679,26 @@ }; } + /** + * Updates multiple pages in a single unit of work. + * + * @param pages + * the pages to write. The Pages object must be fully populated with a valid pageId, type, and data. + * @param marshaller + * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data. + * @throws IOException + * If an disk error occurred. + * @throws IllegalStateException + * if the PageFile is not loaded + */ + public void write(Collection pages, ChunkMarshaller marshaller) throws IOException { + // TODO: Need to update double buffer impl so that it handles a collection of writes. As is right now, + // the pages in this write may be split across multiple write batches which means that they + // will not get applied as a unit of work. + for (Page page : pages) { + write(page, marshaller); + } + } /** * @@ -637,7 +716,7 @@ if( !loaded.get() ) { throw new IllegalStateException("Cannot wriate a page when the page file is not loaded"); } - + DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize); page.setTxId(nextTxid.get()); page.write(dataOut, marshaller); @@ -645,9 +724,12 @@ throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size()); } + page = page.copy(); Long key = page.getPageId(); + addToCache(page); - LOG.debug("Page write request for offset: "+page.getPageId()); +// LOG.debug("write: "+page.getPageId()); + synchronized( writes ) { // If it's not in the write cache... PageWrite write = writes.get(key); @@ -655,14 +737,26 @@ write = new PageWrite(page, dataOut.getData()); writes.put(key, write); } else { - write.setCurrent(dataOut.getData()); + write.setCurrent(page, dataOut.getData()); } // Once we start approaching capacity, notify the writer to start writing if( canStartWriteBatch() ) { - writes.notify(); + if( enableAsyncWrites ) { + writes.notify(); + } else { + while( canStartWriteBatch() ) { + writeBatch(-1, TimeUnit.MILLISECONDS); + } + } } } + + if( page.getType() == Page.FREE_TYPE ) { + removeFromCache(page); + freeList.add(page.getPageId()); + } + } /** @@ -675,7 +769,7 @@ */ public long checkpoint() throws IOException { - if( stopWriter.get() ) { + if( enableAsyncWrites && stopWriter.get() ) { throw new IOException("Page file already stopped: checkpointing is not allowed"); } @@ -689,7 +783,13 @@ this.checkpointLatch = new CountDownLatch(1); } checkpointLatch = this.checkpointLatch; - writes.notify(); + if( enableAsyncWrites ) { + writes.notify(); + } else { + while( !writes.isEmpty() ) { + writeBatch(-1, TimeUnit.MILLISECONDS); + } + } } try { checkpointLatch.await(); @@ -711,11 +811,14 @@ private boolean canStartWriteBatch() { int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER); - // The constant 10 here controls how soon write batches start going to disk.. - // would be nice to figure out how to auto tune that value. Make to small and - // we reduce through put because we are locking the write mutex too offen doing writes - - return capacityUsed >= 10 || checkpointLatch!=null; + if( enableAsyncWrites ) { + // The constant 10 here controls how soon write batches start going to disk.. + // would be nice to figure out how to auto tune that value. Make to small and + // we reduce through put because we are locking the write mutex too offen doing writes + return capacityUsed >= 10 || checkpointLatch!=null; + } else { + return capacityUsed >= 80 || checkpointLatch!=null; + } } @@ -727,14 +830,14 @@ * @throws InterruptedException * @throws IOException */ - private boolean doWrites(long timeout, TimeUnit unit) throws IOException { + private boolean writeBatch(long timeout, TimeUnit unit) throws IOException { int batchLength=8+4; // Account for the: lastTxid + recovery record counter ArrayList batch = new ArrayList(MAX_PAGES_IN_RECOVERY_BUFFER); synchronized( writes ) { // If there is not enough to write, wait for a notification... - if( !canStartWriteBatch() ) { + if( !canStartWriteBatch() && timeout>=0 ) { releaseCheckpointWaiter(); try { writes.wait(unit.toMillis(timeout)); @@ -800,9 +903,9 @@ // Sync again if( enableSyncedWrites ) { writeFile.getFD().sync(); - LOG.debug("Page write complete tx: "+txId+", pages: "+pageOffsets); } +// LOG.debug("write done: "+txId+", pages: "+pageOffsets); nextTxid.incrementAndGet(); synchronized( writes ) { @@ -897,27 +1000,35 @@ } private void startWriter() { - stopWriter.set(false); - writerThread = new Thread("Page Writer") { - @Override - public void run() { - try { - while( !stopWriter.get() ) { - doWrites(1000, TimeUnit.MILLISECONDS); + synchronized( writes ) { + if( enableAsyncWrites ) { + stopWriter.set(false); + writerThread = new Thread("Page Writer") { + @Override + public void run() { + try { + while( !stopWriter.get() ) { + writeBatch(1000, TimeUnit.MILLISECONDS); + } + } catch (Throwable e) { + e.printStackTrace(); + } finally { + releaseCheckpointWaiter(); + } } - } catch (IOException e) { - e.printStackTrace(); - } finally { - releaseCheckpointWaiter(); - } + }; + writerThread.start(); } - }; - writerThread.start(); + } } private void stopWriter() throws InterruptedException { - stopWriter.set(true); - writerThread.join(); + synchronized( writes ) { + if( enableAsyncWrites ) { + stopWriter.set(true); + writerThread.join(); + } + } } /////////////////////////////////////////////////////////////////// @@ -942,21 +1053,17 @@ /////////////////////////////////////////////////////////////////// private Page getFromCache(long pageId) { + synchronized(writes) { + PageWrite pageWrite = writes.get(pageId); + if( pageWrite != null ) { + return pageWrite.page; + } + } + Page result = null; if (enablePageCaching) { result = pageCache.get(pageId); } - if( result == null ) { - synchronized(writes) { - PageWrite pageWrite = writes.get(pageId); - if( pageWrite != null ) { - result = pageWrite.page; - } - } - if (enablePageCaching) { - pageCache.put(pageId, result); - } - } return result; } @@ -1109,6 +1216,13 @@ } /** + * @return the amount of content data that a page can hold. + */ + public int getPageContentSize() { + return this.pageSize-Page.PAGE_HEADER_SIZE; + } + + /** * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting * can no longer be changed. @@ -1151,5 +1265,13 @@ public void setPageCacheSize(int pageCacheSize) { this.pageCacheSize = pageCacheSize; } + + public boolean isEnableAsyncWrites() { + return enableAsyncWrites; + } + + public void setEnableAsyncWrites(boolean enableAsyncWrites) { + this.enableAsyncWrites = enableAsyncWrites; + } } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java?rev=687919&r1=687918&r2=687919&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java Thu Aug 21 18:16:06 2008 @@ -23,6 +23,9 @@ public int offset; public int length; + public ByteSequence() { + } + public ByteSequence(byte data[]) { this.data = data; this.offset = 0; Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java?rev=687919&r1=687918&r2=687919&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java Thu Aug 21 18:16:06 2008 @@ -100,4 +100,22 @@ return size; } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + boolean first=true; + T cur = getHead(); + while( cur!=null ) { + if( !first ) { + sb.append(", "); + } + sb.append(cur); + first=false; + cur = cur.getNext(); + } + sb.append("]"); + return sb.toString(); + } + } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=687919&r1=687918&r2=687919&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Aug 21 18:16:06 2008 @@ -254,4 +254,9 @@ public void clear() { sequences = new LinkedNodeList(); } + + @Override + public String toString() { + return sequences.toString(); + } } \ No newline at end of file Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java?rev=687919&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java Thu Aug 21 18:16:06 2008 @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.page; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; + +import junit.framework.TestCase; +import org.apache.kahadb.StringMarshaller; +import org.apache.kahadb.page.Chunk.PageInputStream; +import org.apache.kahadb.page.Chunk.PageOutputStream; + +public class ChunkTest extends TestCase { + + static final short TEST_TYPE = 65; + + public void testChunkStreams() throws IOException { + + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.load(); + + long id = pf.allocate().getPageId(); + + PageOutputStream pos = new Chunk.PageOutputStream(pf, id); + DataOutputStream os = new DataOutputStream(pos); + for( int i=0; i < 10000; i++) { + os.writeUTF("Test string:"+i); + } + + os.close(); + System.out.println("Chuncks used: "+pos.getPageCount()); + + // Reload the page file. + pf.unload(); + pf.load(); + + PageInputStream pis = new PageInputStream(pf, id); + DataInputStream is = new DataInputStream(pis); + for( int i=0; i < 10000; i++) { + assertEquals("Test string:"+i, is.readUTF()); + } + assertEquals(-1, is.read()); + is.close(); + + System.out.println("Chuncks used: "+pis.getPageCount()); + + pf.unload(); + } + +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java?rev=687919&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java Thu Aug 21 18:16:06 2008 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.page; + +import java.io.File; + +import org.apache.kahadb.Store; +import org.apache.kahadb.impl.index.Index; +import org.apache.kahadb.impl.index.IndexBenchmark; + +public class HashIndexBenchMark extends IndexBenchmark { + + @Override + protected Index createIndex(File root, String name) throws Exception { + + PageFile pf = new PageFile(root, name); + pf.load(); + HashIndex index = new HashIndex(indexManager, pf,pf.allocate().getPageId()); + index.setKeyMarshaller(Store.STRING_MARSHALLER); + +// index.setEnableRecoveryBuffer(false); +// index.setEnableSyncedWrites(false); + return index; + } + +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java?rev=687919&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java Thu Aug 21 18:16:06 2008 @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.page; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import junit.framework.TestCase; +import org.apache.kahadb.Store; +import org.apache.kahadb.impl.index.IndexItem; +import org.apache.kahadb.impl.index.IndexManager; +import org.apache.kahadb.util.IOHelper; + +/** + * Test a HashIndex + */ +public class HashTest extends TestCase { + + private static final int COUNT = 10000; + + private HashIndex hashIndex; + private File directory; + private IndexManager indexManager; + private PageFile pf; + + /** + * @throws java.lang.Exception + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + super.setUp(); + directory = new File(IOHelper.getDefaultDataDirectory()); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + + pf = new PageFile(directory, "im-hash-test"); + pf.load(); + indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong()); + + this.hashIndex = new HashIndex(indexManager, pf, pf.allocate().getPageId()); + this.hashIndex.setBinCapacity(12); + this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER); + } + + public void testHashIndex() throws Exception { + String keyRoot = "key:"; + this.hashIndex.load(); + doInsert(keyRoot); + this.hashIndex.unload(); + this.hashIndex.load(); + checkRetrieve(keyRoot); + doRemove(keyRoot); + this.hashIndex.unload(); + this.hashIndex.load(); + doInsert(keyRoot); + doRemoveHalf(keyRoot); + doInsertHalf(keyRoot); + this.hashIndex.unload(); + this.hashIndex.load(); + checkRetrieve(keyRoot); + this.hashIndex.unload(); + } + + void doInsert(String keyRoot) throws Exception { + for (int i = 0; i < COUNT; i++) { + IndexItem value = indexManager.createNewIndex(); + indexManager.storeIndex(value); + hashIndex.store(keyRoot + i, value); + } + } + + void checkRetrieve(String keyRoot) throws IOException { + for (int i = 0; i < COUNT; i++) { + IndexItem item = (IndexItem) hashIndex.get(keyRoot + i); + assertNotNull("Key missing: "+keyRoot + i, item); + } + } + + void doRemoveHalf(String keyRoot) throws Exception { + for (int i = 0; i < COUNT; i++) { + if (i % 2 == 0) { + hashIndex.remove(keyRoot + i); + } + + } + } + + void doInsertHalf(String keyRoot) throws Exception { + for (int i = 0; i < COUNT; i++) { + if (i % 2 == 0) { + IndexItem value = indexManager.createNewIndex(); + indexManager.storeIndex(value); + hashIndex.store(keyRoot + i, value); + } + } + } + + void doRemove(String keyRoot) throws Exception { + for (int i = 0; i < COUNT; i++) { + hashIndex.remove(keyRoot + i); + } + for (int i = 0; i < COUNT; i++) { + IndexItem item = (IndexItem) hashIndex.get(keyRoot + i); + assertNull(item); + } + } + + void doRemoveBackwards(String keyRoot) throws Exception { + for (int i = COUNT - 1; i >= 0; i--) { + hashIndex.remove(keyRoot + i); + } + for (int i = 0; i < COUNT; i++) { + IndexItem item = (IndexItem) hashIndex.get(keyRoot + i); + assertNull(item); + } + } + + /** + * @throws java.lang.Exception + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + super.tearDown(); + pf.unload(); + } +}