flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/3] git commit: [FLINK-955] ResizingHashTable: automatic resizing, IndexOutOfBoundsException fixed, pointers
Date Wed, 16 Jul 2014 16:50:48 GMT
[FLINK-955] ResizingHashTable: automatic resizing, IndexOutOfBoundsException fixed, pointers

This closes #57


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c04c9bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c04c9bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c04c9bbb

Branch: refs/heads/master
Commit: c04c9bbb27905b1e3332aca7073a64fab24ade1c
Parents: 8ed2b76
Author: rwaury <robert.waury@googlemail.com>
Authored: Mon Jun 23 23:31:48 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jul 16 18:49:51 2014 +0200

----------------------------------------------------------------------
 .../operators/hash/CompactingHashTable.java     | 590 +++++++++++++------
 .../operators/hash/InMemoryPartition.java       |  60 +-
 .../apache/flink/runtime/util/FileUtils.java    |   2 -
 .../apache/flink/runtime/util/IntArrayList.java |  74 +++
 .../util/KeyGroupedMutableObjectIterator.java   |   6 +-
 .../flink/runtime/util/LongArrayList.java       |  74 +++
 .../operators/hash/MemoryHashTableTest.java     | 383 ++++++++++--
 7 files changed, 946 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 890ffe3..239786d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.hash;
 
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,26 +31,24 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
+import org.apache.flink.runtime.util.IntArrayList;
+import org.apache.flink.runtime.util.LongArrayList;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * An implementation of an in-memory Hash Table for variable-length records. 
  * <p>
  * The design of this class follows on many parts the design presented in
  * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al..
- *<p>
- *
- *
+ * <p>
  * <hr>
- * 
  * The layout of the buckets inside a memory segment is as follows:
  * 
  * <pre>
  * +----------------------------- Bucket x ----------------------------
- * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) |
  * |
  * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
  * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -61,8 +57,8 @@ import org.apache.flink.util.MutableObjectIterator;
  * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
  * |
  * +---------------------------- Bucket x + 1--------------------------
- * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) |
  * |
  * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
  * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -73,10 +69,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * | ...
  * |
  * </pre>
- * @param <T>
- * 
- * @param T record type stored in hash table
- * 
+ * @param <T> Record type stored in hash table
  */
 public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 
@@ -121,7 +114,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * actual hash table buckets, consisting of a 4 byte hash value and an 8 byte
 	 * pointer, plus the overhead for the stored length field.
 	 */
-	private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES;
+	private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES + 2;
 	
 	// -------------------------- Bucket Size and Structure -------------------------------------
 	
@@ -131,7 +124,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	
 	private static final int BUCKET_HEADER_LENGTH = 16;
 	
-	private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_OVERHEAD_BYTES;
+	private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_TABLE_BYTES;
 	
 	private static final int BUCKET_POINTER_START_OFFSET = BUCKET_HEADER_LENGTH + (NUM_ENTRIES_PER_BUCKET * HASH_CODE_LEN);
 	
@@ -214,6 +207,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 */
 	private int numBuckets;
 	
+	/**
+	 * flag necessary so a resize is never triggered during a resize since the code paths are interleaved
+	 */
+	private boolean isResizing = false;
+	
 	private AtomicBoolean closed = new AtomicBoolean();
 	
 	private boolean running = true;
@@ -237,7 +235,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			throw new NullPointerException();
 		}
 		if (memorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) {
-			throw new IllegalArgumentException("Too few memory segments provided. Hash Join needs at least " + 
+			throw new IllegalArgumentException("Too few memory segments provided. Hash Table needs at least " + 
 				MIN_NUM_MEMORY_SEGMENTS + " memory segments.");
 		}
 		
@@ -275,8 +273,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	
 	/**
 	 * Build the hash table
-	 * 
-	 * @throws IOException Thrown, if an I/O problem occurs while spilling a partition.
 	 */
 	public void open() {
 		// sanity checks
@@ -300,7 +296,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * Closes the hash table. This effectively releases all internal structures and closes all
 	 * open files and removes them. The call to this method is valid both as a cleanup after the
 	 * complete inputs were properly processed, and as an cancellation call, which cleans up
-	 * all resources that are currently held by the hash join.
+	 * all resources that are currently held by the hash join. If another process still access the hash 
+	 * table after close has been called no operations will be performed.
 	 */
 	public void close() {
 		// make sure that we close only once
@@ -355,12 +352,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		
 		// get the basic characteristics of the bucket
 		final int partitionNumber = bucket.get(bucketInSegmentPos + HEADER_PARTITION_OFFSET);
-		final InMemoryPartition<T> p = this.partitions.get(partitionNumber);
+		InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
 		
 		
 		long pointer;
 		try {
-			pointer = p.appendRecord(record);
+			pointer = partition.appendRecord(record);
 			if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
 				this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
 			}
@@ -368,44 +365,34 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			try {
 				compactPartition(partitionNumber);
 				// retry append
-				pointer = this.partitions.get(partitionNumber).appendRecord(record);
+				partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+				pointer = partition.appendRecord(record);
 			} catch (EOFException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-						" minPartition: " + getMinPartition() +
-						" maxPartition: " + getMaxPartition() +
-						" number of overflow segments: " + getOverflowSegmentCount() +
-						" bucketSize: " + this.buckets.length +
-						" Message: " + ex.getMessage());
+				throw new RuntimeException("Memory ran out. Compaction failed. " + 
+											getMemoryConsumptionString() +
+											" Message: " + ex.getMessage());
 			} catch (IndexOutOfBoundsException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-						" minPartition: " + getMinPartition() +
-						" maxPartition: " + getMaxPartition() +
-						" number of overflow segments: " + getOverflowSegmentCount() +
-						" bucketSize: " + this.buckets.length +
-						" Message: " + ex.getMessage());
+				throw new RuntimeException("Memory ran out. Compaction failed. " + 
+											getMemoryConsumptionString() +
+											" Message: " + ex.getMessage());
 			}
 		} catch (IndexOutOfBoundsException e1) {
 			try {
 				compactPartition(partitionNumber);
 				// retry append
-				pointer = this.partitions.get(partitionNumber).appendRecord(record);
+				partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+				pointer = partition.appendRecord(record);
 			} catch (EOFException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-						" minPartition: " + getMinPartition() +
-						" maxPartition: " + getMaxPartition() +
-						" number of overflow segments: " + getOverflowSegmentCount() +
-						" bucketSize: " + this.buckets.length +
-						" Message: " + ex.getMessage());
+				throw new RuntimeException("Memory ran out. Compaction failed. " + 
+											getMemoryConsumptionString() +
+											" Message: " + ex.getMessage());
 			} catch (IndexOutOfBoundsException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-						" minPartition: " + getMinPartition() +
-						" maxPartition: " + getMaxPartition() +
-						" number of overflow segments: " + getOverflowSegmentCount() +
-						" bucketSize: " + this.buckets.length +
-						" Message: " + ex.getMessage());
+				throw new RuntimeException("Memory ran out. Compaction failed. " + 
+											getMemoryConsumptionString() +
+											" Message: " + ex.getMessage());
 			}
 		}
-		insertBucketEntryFromStart(p, bucket, bucketInSegmentPos, hashCode, pointer);
+		insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer);
 	}
 	
 	
@@ -435,6 +422,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		if(this.closed.get()) {
 			return;
 		}
+		
 		final int searchHashCode = hash(this.buildSideComparator.hash(record));
 		final int posHashCode = searchHashCode % this.numBuckets;
 		
@@ -446,7 +434,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		
 		// get the basic characteristics of the bucket
 		final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
-		final InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
+		InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
 		final MemorySegment[] overflowSegments = partition.overflowSegments;
 		
 		this.buildSideComparator.setReference(record);
@@ -490,21 +478,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						try {
 							compactPartition(partition.getPartitionNumber());
 							// retry append
-							newPointer = this.partitions.get(partitionNumber).appendRecord(record);
+							partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+							newPointer = partition.appendRecord(record);
 						} catch (EOFException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-									" minPartition: " + getMinPartition() +
-									" maxPartition: " + getMaxPartition() +
-									" number of overflow segments: " + getOverflowSegmentCount() +
-									" bucketSize: " + this.buckets.length +
-									" Message: " + ex.getMessage());
+							throw new RuntimeException("Memory ran out. Compaction failed. " + 
+														getMemoryConsumptionString() +
+														" Message: " + ex.getMessage());
 						} catch (IndexOutOfBoundsException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-									" minPartition: " + getMinPartition() +
-									" maxPartition: " + getMaxPartition() +
-									" number of overflow segments: " + getOverflowSegmentCount() +
-									" bucketSize: " + this.buckets.length +
-									" Message: " + ex.getMessage());
+							throw new RuntimeException("Memory ran out. Compaction failed. " + 
+														getMemoryConsumptionString() +
+														" Message: " + ex.getMessage());
 						}
 						bucket.putLong(pointerOffset, newPointer);
 						return;
@@ -514,21 +497,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						try {
 							compactPartition(partition.getPartitionNumber());
 							// retry append
-							newPointer = this.partitions.get(partitionNumber).appendRecord(record);
+							partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+							newPointer = partition.appendRecord(record);
 						} catch (EOFException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-									" minPartition: " + getMinPartition() +
-									" maxPartition: " + getMaxPartition() +
-									" number of overflow segments: " + getOverflowSegmentCount() +
-									" bucketSize: " + this.buckets.length +
-									" Message: " + ex.getMessage());
+							throw new RuntimeException("Memory ran out. Compaction failed. " + 
+														getMemoryConsumptionString() +
+														" Message: " + ex.getMessage());
 						} catch (IndexOutOfBoundsException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() + 
-									" minPartition: " + getMinPartition() +
-									" maxPartition: " + getMaxPartition() +
-									" number of overflow segments: " + getOverflowSegmentCount() +
-									" bucketSize: " + this.buckets.length +
-									" Message: " + ex.getMessage());
+							throw new RuntimeException("Memory ran out. Compaction failed. " + 
+														getMemoryConsumptionString() +
+														" Message: " + ex.getMessage());
 						}
 						bucket.putLong(pointerOffset, newPointer);
 						return;
@@ -546,6 +524,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			if (newForwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
 				// nothing found. append and insert
 				long pointer = partition.appendRecord(record);
+				//insertBucketEntryFromStart(partition, originalBucket, originalBucketOffset, searchHashCode, pointer);
 				insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
 				if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
 					this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
@@ -567,6 +546,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			int bucketInSegmentPos, int hashCode, long pointer)
 	throws IOException
 	{
+		boolean checkForResize = false;
 		// find the position to put the hash code and pointer
 		final int count = bucket.getInt(bucketInSegmentPos + HEADER_COUNT_OFFSET);
 		if (count < NUM_ENTRIES_PER_BUCKET) {
@@ -574,8 +554,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode);	// hash code
 			bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
 			bucket.putInt(bucketInSegmentPos + HEADER_COUNT_OFFSET, count + 1); // update count
-		}
-		else {
+		} else {
 			// we need to go to the overflow buckets
 			final long originalForwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
 			final long forwardForNewBucket;
@@ -596,14 +575,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					seg.putLong(segOffset + BUCKET_POINTER_START_OFFSET + (obCount * POINTER_LEN), pointer); // pointer
 					seg.putInt(segOffset + HEADER_COUNT_OFFSET, obCount + 1); // update count
 					return;
-				}
-				else {
+				} else {
 					// no space here, we need a new bucket. this current overflow bucket will be the
 					// target of the new overflow bucket
 					forwardForNewBucket = originalForwardPointer;
 				}
-			}
-			else {
+			} else {
 				// no overflow bucket yet, so we need a first one
 				forwardForNewBucket = BUCKET_FORWARD_POINTER_NOT_SET;
 			}
@@ -628,8 +605,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				}
 				p.overflowSegments[p.numOverflowSegments] = overflowSeg;
 				p.numOverflowSegments++;
-			}
-			else {
+				checkForResize = true;
+			} else {
 				// there is space in the last overflow bucket
 				overflowBucketNum = p.numOverflowSegments - 1;
 				overflowSeg = p.overflowSegments[overflowBucketNum];
@@ -653,17 +630,23 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			
 			// set the count to one
 			overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1); 
+			if(checkForResize && !this.isResizing) {
+				// check if we should resize buckets
+				if(this.buckets.length <= getOverflowSegmentCount()) {
+					resizeHashTable();
+				}
+			}
 		}
 	}
 	
-	private final void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) {
+	private final void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException {
+		boolean checkForResize = false;
 		if (countInCurrentBucket < NUM_ENTRIES_PER_BUCKET) {
 			// we are good in our current bucket, put the values
 			currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode);	// hash code
 			currentBucket.putLong(currentBucketOffset + BUCKET_POINTER_START_OFFSET + (countInCurrentBucket * POINTER_LEN), pointer); // pointer
 			currentBucket.putInt(currentBucketOffset + HEADER_COUNT_OFFSET, countInCurrentBucket + 1); // update count
-		}
-		else {
+		} else {
 			// we need a new overflow bucket
 			MemorySegment overflowSeg;
 			final int overflowBucketNum;
@@ -684,8 +667,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				}
 				partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
 				partition.numOverflowSegments++;
-			}
-			else {
+				checkForResize = true;
+			} else {
 				// there is space in the last overflow segment
 				overflowBucketNum = partition.numOverflowSegments - 1;
 				overflowSeg = partition.overflowSegments[overflowBucketNum];
@@ -708,7 +691,13 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
 			
 			// set the count to one
-			overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1); 
+			overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
+			if(checkForResize && !this.isResizing) {
+				// check if we should resize buckets
+				if(this.buckets.length <= getOverflowSegmentCount()) {
+					resizeHashTable();
+				}
+			}
 		}
 	}
 	
@@ -782,11 +771,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		if (s > 0) {
 			return this.availableMemory.remove(s-1);
 		} else {
-			throw new RuntimeException("Memory ran out. numPartitions: " + this.partitions.size() + 
-													" minPartition: " + getMinPartition() +
-													" maxPartition: " + getMaxPartition() + 
-													" number of overflow segments: " + getOverflowSegmentCount() +
-													" bucketSize: " + this.buckets.length);
+			throw new RuntimeException("Memory ran out. " + getMemoryConsumptionString());
 		}
 	}
 
@@ -798,7 +783,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * Gets the number of partitions to be used for an initial hash-table, when no estimates are
 	 * available.
 	 * <p>
-	 * The current logic makes sure that there are always between 10 and 127 partitions, and close
+	 * The current logic makes sure that there are always between 10 and 32 partitions, and close
 	 * to 0.1 of the number of buffers.
 	 * 
 	 * @param numBuffers The number of buffers available.
@@ -808,6 +793,53 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		return Math.max(10, Math.min(numBuffers / 10, MAX_NUM_PARTITIONS));
 	}
 	
+	/**
+	 * @return String containing a summary of the memory consumption for error messages
+	 */
+	private String getMemoryConsumptionString() {
+		String result = new String("numPartitions: " + this.partitions.size() + 
+				" minPartition: " + getMinPartition() +
+				" maxPartition: " + getMaxPartition() +
+				" number of overflow segments: " + getOverflowSegmentCount() +
+				" bucketSize: " + this.buckets.length +
+				" Overall memory: " + getSize() + 
+				" Partition memory: " + getPartitionSize());
+		return result;
+	}
+	
+	/**
+	 * Size of all memory segments owned by this hash table
+	 * 
+	 * @return size in bytes
+	 */
+	private long getSize() {
+		long numSegments = 0;
+		numSegments += this.availableMemory.size();
+		numSegments += this.buckets.length;
+		for(InMemoryPartition<T> p : this.partitions) {
+			numSegments += p.getBlockCount();
+			numSegments += p.numOverflowSegments;
+		}
+		numSegments += this.compactionMemory.getBlockCount();
+		return numSegments*this.segmentSize;
+	}
+	
+	/**
+	 * Size of all memory segments owned by the partitions of this hash table excluding the compaction partition
+	 * 
+	 * @return size in bytes
+	 */
+	private long getPartitionSize() {
+		long numSegments = 0;
+		for(InMemoryPartition<T> p : this.partitions) {
+			numSegments += p.getBlockCount();
+		}
+		return numSegments*this.segmentSize;
+	}
+	
+	/**
+	 * @return number of memory segments in the largest partition
+	 */
 	private int getMaxPartition() {
 		int maxPartition = 0;
 		for(InMemoryPartition<T> p1 : this.partitions) {
@@ -818,6 +850,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		return maxPartition;
 	}
 	
+	/**
+	 * @return number of memory segments in the smallest partition
+	 */
 	private int getMinPartition() {
 		int minPartition = Integer.MAX_VALUE;
 		for(InMemoryPartition<T> p1 : this.partitions) {
@@ -828,6 +863,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		return minPartition;
 	}
 	
+	/**
+	 * @return number of memory segments used in overflow buckets
+	 */
 	private int getOverflowSegmentCount() {
 		int result = 0;
 		for(InMemoryPartition<T> p : this.partitions) {
@@ -836,29 +874,20 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		return result;
 	}
 	
+	/**
+	 * tries to find a good value for the number of buckets
+	 * will ensure that the number of buckets is a multiple of numPartitions
+	 * 
+	 * @return number of buckets
+	 */
 	private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
-		// ----------------------------------------------------------------------------------------
-		// the following observations hold:
-		// 1) If the records are assumed to be very large, then many buffers need to go to the partitions
-		//    and fewer to the table
-		// 2) If the records are small, then comparatively many have to go to the buckets, and fewer to the
-		//    partitions
-		// 3) If the bucket-table is chosen too small, we will eventually get many collisions and will grow the
-		//    hash table, incrementally adding buffers.
-		// 4) If the bucket-table is chosen to be large and we actually need more buffers for the partitions, we
-		//    cannot subtract them afterwards from the table
-		//
-		// ==> We start with a comparatively small hash-table. We aim for a 200% utilization of the bucket table
-		//     when all the partition buffers are full. Most likely, that will cause some buckets to be re-hashed
-		//     and grab additional buffers away from the partitions.
-		// NOTE: This decision may be subject to changes after conclusive experiments!
-		// ----------------------------------------------------------------------------------------
-		
 		final long totalSize = ((long) bufferSize) * numBuffers;
 		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
-		final long bucketBytes = numRecordsStorable * RECORD_TABLE_BYTES;
-		final long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
-		
+		final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
+		long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
+		while(numBuckets % numPartitions != 0) {
+			numBuckets++;
+		}
 		return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numBuckets;
 	}
 	
@@ -874,22 +903,199 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	}
 	
 	/**
+	 * Attempts to double the number of buckets
+	 * 
+	 * @return true on success
+	 * @throws IOException 
+	 */
+	private boolean resizeHashTable() throws IOException {
+		final int newNumBuckets = 2*this.numBuckets;
+		final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
+		final int newNumSegments = (newNumBuckets + (bucketsPerSegment-1)) / bucketsPerSegment;
+		final int additionalSegments = newNumSegments-this.buckets.length;
+		final int numPartitions = this.partitions.size();
+		if(this.availableMemory.size() < additionalSegments) {
+			for(int i = 0; i < numPartitions; i++) {
+				compactPartition(i);
+				if(this.availableMemory.size() >= additionalSegments) {
+					break;
+				}
+			}
+		}
+		if(this.availableMemory.size() < additionalSegments || this.closed.get()) {
+			return false;
+		} else {
+			this.isResizing = true;
+			// allocate new buckets
+			final int startOffset = (this.numBuckets * HASH_BUCKET_SIZE) % this.segmentSize;
+			MemorySegment[] newBuckets = new MemorySegment[additionalSegments];
+			final int oldNumBuckets = this.numBuckets;
+			final int oldNumSegments = this.buckets.length;
+			MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments];
+			System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length);
+			System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length);
+			this.buckets = mergedBuckets;
+			this.numBuckets = newNumBuckets;
+			// initialize all new buckets
+			boolean oldSegment = (startOffset != 0);
+			final int startSegment = oldSegment ? (oldNumSegments-1) : oldNumSegments;
+			for (int i = startSegment, bucket = oldNumBuckets; i < newNumSegments && bucket < this.numBuckets; i++) {
+				MemorySegment seg;
+				int bucketOffset = 0;
+				if(oldSegment) { // the first couple of new buckets may be located on an old segment
+					seg = this.buckets[i];
+					for (int k = (oldNumBuckets % bucketsPerSegment) ; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
+						bucketOffset = k * HASH_BUCKET_SIZE;	
+						// initialize the header fields
+						seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
+						seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
+						seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+					}
+				} else {
+					seg = getNextBuffer();
+					// go over all buckets in the segment
+					for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
+						bucketOffset = k * HASH_BUCKET_SIZE;	
+						// initialize the header fields
+						seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
+						seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
+						seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+					}
+				}				
+				this.buckets[i] = seg;
+				oldSegment = false; // we write on at most one old segment
+			}
+			int hashOffset = 0;
+			int hash = 0;
+			int pointerOffset = 0;
+			long pointer = 0;
+			IntArrayList hashList = new IntArrayList(NUM_ENTRIES_PER_BUCKET);
+			LongArrayList pointerList = new LongArrayList(NUM_ENTRIES_PER_BUCKET);
+			IntArrayList overflowHashes = new IntArrayList(64);
+			LongArrayList overflowPointers = new LongArrayList(64);
+			// go over all buckets and split them between old and new buckets
+			for(int i = 0; i < numPartitions; i++) {
+				InMemoryPartition<T> partition = this.partitions.get(i);
+				final MemorySegment[] overflowSegments = partition.overflowSegments;
+				int posHashCode = 0;
+				for (int j = 0, bucket = i; j < this.buckets.length && bucket < oldNumBuckets; j++) {
+					MemorySegment segment = this.buckets[j];
+					// go over all buckets in the segment belonging to the partition
+					for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < oldNumBuckets; k += numPartitions, bucket += numPartitions) {
+						int bucketOffset = k * HASH_BUCKET_SIZE;
+						if((int)segment.get(bucketOffset + HEADER_PARTITION_OFFSET) != i) {
+							throw new IOException("Accessed wrong bucket! wanted: " + i + " got: " + segment.get(bucketOffset + HEADER_PARTITION_OFFSET));
+						}
+						// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+						int countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+						int numInSegment = 0;
+						pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+						hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
+						while (true) {
+							while (numInSegment < countInSegment) {
+								hash = segment.getInt(hashOffset);
+								if((hash % this.numBuckets) != bucket && (hash % this.numBuckets) != (bucket+oldNumBuckets)) {
+									throw new IOException("wanted: " + bucket + " or " + (bucket + oldNumBuckets) + " got: " + hash%this.numBuckets);
+								}
+								pointer = segment.getLong(pointerOffset);
+								hashList.add(hash);
+								pointerList.add(pointer);
+								pointerOffset += POINTER_LEN;
+								hashOffset += HASH_CODE_LEN;
+								numInSegment++;
+							}
+							// this segment is done. check if there is another chained bucket
+							final long forwardPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
+							if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+								break;
+							}
+							final int overflowSegNum = (int) (forwardPointer >>> 32);
+							segment = overflowSegments[overflowSegNum];
+							bucketOffset = (int)(forwardPointer & 0xffffffff);
+							countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+							pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+							hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
+							numInSegment = 0;
+						}
+						segment = this.buckets[j];
+						bucketOffset = k * HASH_BUCKET_SIZE;
+						// reset bucket for re-insertion
+						segment.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
+						segment.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+						// refill table
+						if(hashList.size() != pointerList.size()) {
+							throw new IOException("Pointer and hash counts do not match. hashes: " + hashList.size() + " pointer: " + pointerList.size());
+						}
+						int newSegmentIndex = (bucket + oldNumBuckets) / bucketsPerSegment;
+						MemorySegment newSegment = this.buckets[newSegmentIndex];
+						// we need to avoid overflows in the first run
+						int oldBucketCount = 0;
+						int newBucketCount = 0;
+						while(!hashList.isEmpty()) {
+							hash = hashList.removeInt(hashList.size()-1);
+							pointer = pointerList.removeLong(pointerList.size()-1);
+							posHashCode = hash % this.numBuckets;
+							if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
+								bucketOffset = (bucket % bucketsPerSegment) * HASH_BUCKET_SIZE;
+								insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer);
+								oldBucketCount++;
+							} else if(posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
+								bucketOffset = ((bucket + oldNumBuckets) % bucketsPerSegment) * HASH_BUCKET_SIZE;
+								insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer);
+								newBucketCount++;
+							} else if(posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
+								overflowHashes.add(hash);
+								overflowPointers.add(pointer);
+							} else {
+								throw new IOException("Accessed wrong bucket. Target: " + bucket + " or " + (bucket + oldNumBuckets) + " Hit: " + posHashCode);
+							}
+						}
+						hashList.clear();
+						pointerList.clear();
+					}
+				}
+				// reset partition's overflow buckets and reclaim their memory
+				this.availableMemory.addAll(partition.resetOverflowBuckets());
+				// clear overflow lists
+				int bucketArrayPos = 0;
+				int bucketInSegmentPos = 0;
+				MemorySegment bucket = null;
+				while(!overflowHashes.isEmpty()) {
+					hash = overflowHashes.removeInt(overflowHashes.size()-1);
+					pointer = overflowPointers.removeLong(overflowPointers.size()-1);
+					posHashCode = hash % this.numBuckets; 
+					bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
+					bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+					bucket = this.buckets[bucketArrayPos];
+					insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hash, pointer);
+				}
+				overflowHashes.clear();
+				overflowPointers.clear();
+			}
+			this.isResizing = false;
+			return true;
+		}
+	}
+	
+	/**
 	 * Compacts (garbage collects) partition with copy-compact strategy using compaction partition
 	 * 
-	 * @param partition partition number
+	 * @param partitionNumber partition to compact
 	 * @throws IOException 
 	 */
-	private void compactPartition(int partitionNumber) throws IOException {
-		// stop if no garbage exists or table is closed
-		if(this.partitions.get(partitionNumber).isCompacted() || this.closed.get()) {
+	private void compactPartition(final int partitionNumber) throws IOException {
+		// do nothing if table was closed, parameter is invalid or no garbage exists
+		if(this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
 			return;
 		}
 		// release all segments owned by compaction partition
 		this.compactionMemory.clearAllMemory(availableMemory);
 		this.compactionMemory.allocateSegments(1);
+		this.compactionMemory.pushDownPages();
 		T tempHolder = this.buildSideSerializer.createInstance();
+		final int numPartitions = this.partitions.size();
 		InMemoryPartition<T> partition = this.partitions.remove(partitionNumber);
-		final int numPartitions = this.partitions.size() + 1; // dropped one earlier
+		MemorySegment[] overflowSegments = partition.overflowSegments;
 		long pointer = 0L;
 		int pointerOffset = 0;
 		int bucketOffset = 0;
@@ -900,73 +1106,62 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < this.numBuckets; k += numPartitions, bucket += numPartitions) {
 				bucketOffset = k * HASH_BUCKET_SIZE;
 				if((int)segment.get(bucketOffset + HEADER_PARTITION_OFFSET) != partitionNumber) {
-					throw new IOException("Accessed wrong bucket! ");
-				}
-				int count = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
-				for (int j = 0; j < NUM_ENTRIES_PER_BUCKET && j < count; j++) {
-					pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET + (j * POINTER_LEN);
-					pointer = segment.getLong(pointerOffset);
-					partition.readRecordAt(pointer, tempHolder);
-					pointer = this.compactionMemory.appendRecord(tempHolder);
-					segment.putLong(pointerOffset, pointer);
+					throw new IOException("Accessed wrong bucket! wanted: " + partitionNumber + " got: " + segment.get(bucketOffset + HEADER_PARTITION_OFFSET));
 				}
-				long overflowPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
-				if(overflowPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
-					// scan overflow buckets
-					int current = NUM_ENTRIES_PER_BUCKET;
-					bucketOffset = (int) (overflowPointer & 0xffffffff);
-					pointerOffset = ((int) (overflowPointer & 0xffffffff)) + BUCKET_POINTER_START_OFFSET;
-					int overflowSegNum = (int) (overflowPointer >>> 32);
-					count += partition.overflowSegments[overflowSegNum].getInt(bucketOffset + HEADER_COUNT_OFFSET);
-					while(current < count) {
-						pointer = partition.overflowSegments[overflowSegNum].getLong(pointerOffset);
+				// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+				int countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+				int numInSegment = 0;
+				pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+				while (true) {
+					while (numInSegment < countInSegment) {
+						pointer = segment.getLong(pointerOffset);
 						partition.readRecordAt(pointer, tempHolder);
 						pointer = this.compactionMemory.appendRecord(tempHolder);
-						partition.overflowSegments[overflowSegNum].putLong(pointerOffset, pointer);
-						current++;
-						if(current % NUM_ENTRIES_PER_BUCKET == 0) {
-							count += partition.overflowSegments[overflowSegNum].getInt(bucketOffset + HEADER_COUNT_OFFSET);
-							overflowPointer = partition.overflowSegments[overflowSegNum].getLong(bucketOffset + HEADER_FORWARD_OFFSET);
-							if(overflowPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
-								break;
-							}
-							overflowSegNum = (int) (overflowPointer >>> 32);
-							bucketOffset = (int) (overflowPointer & 0xffffffff);
-							pointerOffset = ((int) (overflowPointer & 0xffffffff)) + BUCKET_POINTER_START_OFFSET;
-						} else {
-							pointerOffset += POINTER_LEN;
-						}
+						segment.putLong(pointerOffset, pointer);
+						pointerOffset += POINTER_LEN;
+						numInSegment++;
+					}
+					// this segment is done. check if there is another chained bucket
+					final long forwardPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
+					if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+						break;
 					}
+					final int overflowSegNum = (int) (forwardPointer >>> 32);
+					segment = overflowSegments[overflowSegNum];
+					bucketOffset = (int)(forwardPointer & 0xffffffff);
+					countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+					pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+					numInSegment = 0;
 				}
+				segment = this.buckets[i];
 			}
 		}
 		// swap partition with compaction partition
 		this.compactionMemory.setPartitionNumber(partitionNumber);
 		this.partitions.add(partitionNumber, compactionMemory);
-		this.compactionMemory = partition;
-		this.partitions.get(partitionNumber).overflowSegments = this.compactionMemory.overflowSegments;
-		this.partitions.get(partitionNumber).numOverflowSegments = this.compactionMemory.numOverflowSegments;
-		this.partitions.get(partitionNumber).nextOverflowBucket = this.compactionMemory.nextOverflowBucket;
+		this.partitions.get(partitionNumber).overflowSegments = partition.overflowSegments;
+		this.partitions.get(partitionNumber).numOverflowSegments = partition.numOverflowSegments;
+		this.partitions.get(partitionNumber).nextOverflowBucket = partition.nextOverflowBucket;
 		this.partitions.get(partitionNumber).setCompaction(true);
+		//this.partitions.get(partitionNumber).pushDownPages();
+		this.compactionMemory = partition;
 		this.compactionMemory.resetRecordCounter();
 		this.compactionMemory.setPartitionNumber(-1);
+		this.compactionMemory.overflowSegments = null;
+		this.compactionMemory.numOverflowSegments = 0;
+		this.compactionMemory.nextOverflowBucket = 0;
 		// try to allocate maximum segment count
-		int maxSegmentNumber = 0;
-		for (InMemoryPartition<T> e : this.partitions) {
-			if(e.getBlockCount() > maxSegmentNumber) {
-				maxSegmentNumber = e.getBlockCount();
-			}
-		}
+		this.compactionMemory.clearAllMemory(this.availableMemory);
+		int maxSegmentNumber = this.getMaxPartition();
 		this.compactionMemory.allocateSegments(maxSegmentNumber);
-		if(this.compactionMemory.getBlockCount() > maxSegmentNumber) {
-			this.compactionMemory.releaseSegments(maxSegmentNumber, availableMemory);
-		}
+		this.compactionMemory.resetRWViews();
+		this.compactionMemory.pushDownPages();
 	}
 	
 	/**
 	 * Compacts partition but may not reclaim all garbage
 	 * 
-	 * @param partition partition number
+	 * @param partitionNumber partition number
 	 * @throws IOException 
 	 */
 	@SuppressWarnings("unused")
@@ -1043,6 +1238,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			}
 		}
 
+		/**
+		 * utility function that inserts all entries from a bucket and its overflow buckets into the cache
+		 * 
+		 * @return true if last bucket was not reached yet
+		 * @throws IOException
+		 */
 		private boolean fillCache() throws IOException {
 			if(currentBucketIndex >= table.numBuckets) {
 				return false;
@@ -1069,7 +1270,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						partition.readRecordAt(pointer, target);
 						cache.add(target);
 					} catch (IOException e) {
-							throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
+							throw new RuntimeException("Error deserializing record from the Hash Table: " + e.getMessage(), e);
 					}
 				}
 				// this segment is done. check if there is another chained bucket
@@ -1124,8 +1325,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			
 			// get the basic characteristics of the bucket
 			final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
-			final InMemoryPartition<T> partition = partitions.get(partitionNumber);
-			final MemorySegment[] overflowSegments = partition.overflowSegments;
+			final InMemoryPartition<T> p = partitions.get(partitionNumber);
+			final MemorySegment[] overflowSegments = p.overflowSegments;
 			
 			this.pairComparator.setReference(probeSideRecord);
 			
@@ -1150,10 +1351,10 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						
 						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
 						try {
-							partition.readRecordAt(pointer, targetForMatch);
+							p.readRecordAt(pointer, targetForMatch);
 							
 							if (this.pairComparator.equalToReference(targetForMatch)) {
-								this.partition = partition;
+								this.partition = p;
 								this.bucket = bucket;
 								this.pointerOffsetInBucket = pointerOffset;
 								return true;
@@ -1187,9 +1388,46 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			if(closed.get()) {
 				return;
 			}
-			long newPointer = this.partition.appendRecord(record);
+			long newPointer;
+			try {
+				newPointer = this.partition.appendRecord(record);
+			} catch (EOFException e) {
+				// system is out of memory so we attempt to reclaim memory with a copy compact run
+				try {
+					int partitionNumber = this.partition.getPartitionNumber();
+					compactPartition(partitionNumber);
+					// retry append
+					this.partition = partitions.get(partitionNumber);
+					newPointer = this.partition.appendRecord(record);
+				} catch (EOFException ex) {
+					throw new RuntimeException("Memory ran out. Compaction failed. " + 
+												getMemoryConsumptionString() +
+												" Message: " + ex.getMessage());
+				} catch (IndexOutOfBoundsException ex) {
+					throw new RuntimeException("Memory ran out. Compaction failed. " + 
+												getMemoryConsumptionString() +
+												" Message: " + ex.getMessage());
+				}
+			} catch (IndexOutOfBoundsException e) {
+				// system is out of memory so we attempt to reclaim memory with a copy compact run
+				try {
+					int partitionNumber = this.partition.getPartitionNumber();
+					compactPartition(partitionNumber);
+					// retry append
+					this.partition = partitions.get(partitionNumber);
+					newPointer = this.partition.appendRecord(record);
+				} catch (EOFException ex) {
+					throw new RuntimeException("Memory ran out. Compaction failed. " + 
+												getMemoryConsumptionString() +
+												" Message: " + ex.getMessage());
+				} catch (IndexOutOfBoundsException ex) {
+					throw new RuntimeException("Memory ran out. Compaction failed. " + 
+												getMemoryConsumptionString() +
+												" Message: " + ex.getMessage());
+				}
+			}
 			this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
-			this.partition.setCompaction(false); //FIXME Do we really create garbage here?
+			this.partition.setCompaction(false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index 3ccca9c..b90ca1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -57,9 +57,9 @@ public class InMemoryPartition<T> {
 	
 	private final ListMemorySegmentSource availableMemory;
 	
-	private final WriteView writeView;
+	private WriteView writeView;
 	
-	private final ReadView readView;
+	private ReadView readView;
 	
 	private long recordCounter;				// number of records in this partition including garbage
 	
@@ -69,6 +69,10 @@ public class InMemoryPartition<T> {
 	
 	private boolean compacted;						// overwritten records since allocation or last full compaction
 	
+	private int pageSize;							// segment size in bytes
+	
+	private int pageSizeInBits;
+	
 	// --------------------------------------------------------------------------------------------------
 	
 	
@@ -100,6 +104,10 @@ public class InMemoryPartition<T> {
 		// empty partitions have no garbage
 		this.compacted = true;
 		
+		this.pageSize = pageSize;
+		
+		this.pageSizeInBits = pageSizeInBits;
+		
 		this.writeView = new WriteView(this.partitionPages, memSource, pageSize, pageSizeInBits);
 		this.readView = new ReadView(this.partitionPages, pageSize, pageSizeInBits);
 	}
@@ -148,6 +156,38 @@ public class InMemoryPartition<T> {
 	}
 	
 	/**
+	 * resets read and write views and should only be used on compaction partition
+	 */
+	public void resetRWViews() {
+		this.writeView.resetTo(0L);
+		this.readView.setReadPosition(0L);
+	}
+	
+	public void pushDownPages() {
+		this.writeView = new WriteView(this.partitionPages, availableMemory, pageSize, pageSizeInBits);
+		this.readView = new ReadView(this.partitionPages, pageSize, pageSizeInBits);
+	}
+	
+	/**
+	 * resets overflow bucket counters and returns freed memory and should only be used for resizing
+	 * 
+	 * @return freed memory segments
+	 */
+	public ArrayList<MemorySegment> resetOverflowBuckets() {
+		this.numOverflowSegments = 0;
+		this.nextOverflowBucket = 0;
+		
+		ArrayList<MemorySegment> result = new ArrayList<MemorySegment>(this.overflowSegments.length);
+		for(int i = 0; i < this.overflowSegments.length; i++) {
+			if(this.overflowSegments[i] != null) {
+				result.add(this.overflowSegments[i]);
+			}
+		}
+		this.overflowSegments = new MemorySegment[2];
+		return result;
+	}
+	
+	/**
 	 * @return true if garbage exists in partition
 	 */
 	public boolean isCompacted() {
@@ -179,8 +219,7 @@ public class InMemoryPartition<T> {
 			this.serializer.serialize(record, this.writeView);
 			this.recordCounter++;
 			return pointer;
-		}
-		catch (EOFException e) {
+		} catch (EOFException e) {
 			// we ran out of pages. 
 			// first, reset the pages and then we need to trigger a compaction
 			//int oldCurrentBuffer = 
@@ -224,8 +263,7 @@ public class InMemoryPartition<T> {
 			for (int k = 0; k < this.numOverflowSegments; k++) {
 				target.add(this.overflowSegments[k]);
 			}
-		}
-		
+		}	
 		// return the partition buffers
 		target.addAll(this.partitionPages);
 		this.partitionPages.clear();
@@ -248,12 +286,6 @@ public class InMemoryPartition<T> {
 		}
 	}
 	
-	public void releaseSegments(int maxSegmentNumber, ArrayList<MemorySegment> target) {
-		while(getBlockCount() > maxSegmentNumber) {
-			target.add(partitionPages.remove(partitionPages.size()-1));
-		}
-	}
-
 	@Override
 	public String toString() {
 		return String.format("Partition %d - %d records, %d partition blocks, %d bucket overflow blocks", getPartitionNumber(), getRecordCount(), getBlockCount(), this.numOverflowSegments);
@@ -285,6 +317,7 @@ public class InMemoryPartition<T> {
 			this.memSource = memSource;
 			this.sizeBits = pageSizeBits;
 			this.sizeMask = pageSize - 1;
+			this.segmentNumberOffset = 0;
 		}
 		
 
@@ -326,7 +359,7 @@ public class InMemoryPartition<T> {
 	private static final class ReadView extends AbstractPagedInputView implements SeekableDataInputView {
 
 		private final ArrayList<MemorySegment> segments;
-		
+
 		private final int segmentSizeBits;
 		
 		private final int segmentSizeMask;
@@ -346,6 +379,7 @@ public class InMemoryPartition<T> {
 			this.segments = segments;
 			this.segmentSizeBits = segmentSizeBits;
 			this.segmentSizeMask = segmentSize - 1;
+			this.segmentNumberOffset = 0;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
index 27142f7..e985d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 /**
  * This is a utility class to deal with temporary files.
- * 
  */
 public final class FileUtils {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
new file mode 100644
index 0000000..c359211
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+/**
+ * Minimal implementation of an array-backed list of ints
+ */
+public class IntArrayList {
+	
+	private int size;
+	
+	private int[] array;
+
+	public IntArrayList(final int capacity) {
+		this.size = 0;
+		this.array = new int[capacity];
+	}
+	
+	public int size() {
+		return size;
+	}
+	
+	public boolean add(final int number) {
+		grow(size+1);
+		array[size++] = number;
+		return true;
+	}
+
+	public int removeInt(int index) {
+		if(index >= size) {
+			throw new IndexOutOfBoundsException("Index (" + index + ") is greater than or equal to list size (" + size + ")");
+		}
+		final int old = array[ index ];
+		size--;
+		if(index != size) {
+			System.arraycopy(array, index+1, array, index, size-index );
+		}
+		return old;
+	}
+	
+	public void clear() {
+		size = 0;
+	}
+	
+	public boolean isEmpty() {
+		return (size==0);
+	}
+	
+	private void grow(final int length) {
+		if(length > array.length) {
+			final int newLength = (int)Math.max(Math.min(2L * array.length, Integer.MAX_VALUE-8), length);
+			final int[] t = new int[newLength];
+			System.arraycopy(array, 0, t, 0, size);
+			array = t;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
index 24df4e3..1e5fead 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.IOException;
@@ -28,10 +27,9 @@ import org.apache.flink.util.MutableObjectIterator;
 /**
  * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
  * A sub-iterator over all values with the same key is provided.
- * 
  */
-public final class KeyGroupedMutableObjectIterator<E>
-{
+public final class KeyGroupedMutableObjectIterator<E> {
+	
 	private final MutableObjectIterator<E> iterator;
 	
 	private final TypeSerializer<E> serializer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
new file mode 100644
index 0000000..926a00c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+/**
+ * Minimal implementation of an array-backed list of longs
+ */
+public class LongArrayList {
+		
+	private int size;
+	
+	private long[] array;
+	
+	public LongArrayList(int capacity) {
+		this.size = 0;
+		this.array = new long[capacity];
+	}
+	
+	public int size() {
+		return size;
+	}
+	
+	public boolean add(long number) {
+		grow(size+1);
+		array[size++] = number;
+		return true;
+	}
+	
+	public long removeLong(int index) {
+		if(index >= size) {
+			throw new IndexOutOfBoundsException( "Index (" + index + ") is greater than or equal to list size (" + size + ")" );
+		}
+		final long old = array[index];
+		size--;
+		if(index != size) {
+			System.arraycopy(array, index+1, array, index, size-index );
+		}
+		return old;
+	}
+	
+	public void clear() {
+		size = 0;
+	}
+	
+	public boolean isEmpty() {
+		return (size==0);
+	}
+	
+	private void grow(int length) {
+		if(length > array.length) {
+			final int newLength = (int)Math.max(Math.min(2L * array.length, Integer.MAX_VALUE-8), length);
+			final long[] t = new long[newLength];
+			System.arraycopy(array, 0, t, 0, size);
+			array = t;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
index fbc4f65..b644da1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparat
 import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
+import org.powermock.reflect.Whitebox;
 
 
 public class MemoryHashTableTest {
@@ -82,12 +83,16 @@ public class MemoryHashTableTest {
 	
 	private final TypePairComparator<IntPair, IntList> pairComparatorPL =new IntPairListPairComparator();
 	
-	private final int SIZE = 80; //FIXME 75 triggers serialization bug in testVariableLengthBuildAndRetrieve
+	private final int SIZE = 75;
 	
 	private final int NUM_PAIRS = 100000;
 
 	private final int NUM_LISTS = 100000;
 	
+	private final int ADDITIONAL_MEM = 100;
+	
+	private final int NUM_REWRITES = 10;
+	
 
 	private final TypeSerializer<StringPair> serializerS = new StringPairSerializer();
 	
@@ -96,7 +101,7 @@ public class MemoryHashTableTest {
 	private final TypePairComparator<StringPair, StringPair> pairComparatorS = new StringPairPairComparator();
 	
 	
-	
+	@Test
 	public void testDifferentProbers() {
 		final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
 		
@@ -106,11 +111,13 @@ public class MemoryHashTableTest {
 		AbstractHashTableProber<IntPair, IntPair> prober2 = table.getProber(comparator, pairComparator);
 		
 		assertFalse(prober1 == prober2);
+		
+		table.close();
+		assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
 	}
 	
 	@Test
 	public void testBuildAndRetrieve() {
-		
 		try {
 			final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
 			
@@ -134,8 +141,7 @@ public class MemoryHashTableTest {
 			
 			table.close();
 			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
 		}
@@ -143,7 +149,6 @@ public class MemoryHashTableTest {
 	
 	@Test
 	public void testEntryIterator() {
-		
 		try {
 			final int NUM_MEM_PAGES = SIZE * NUM_LISTS / PAGE_SIZE;
 			final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
@@ -167,8 +172,7 @@ public class MemoryHashTableTest {
 			
 			assertTrue(sum == result);
 			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
 		}
@@ -197,7 +201,8 @@ public class MemoryHashTableTest {
 				assertTrue(listProber.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
-			
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
@@ -218,7 +223,6 @@ public class MemoryHashTableTest {
 				try {
 					table.insert(lists[i]);
 				} catch (Exception e) {
-					//System.out.println("index: " + i + " ");
 					throw e;
 				}
 			}
@@ -241,14 +245,13 @@ public class MemoryHashTableTest {
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(overwriteLists[i], target));
+				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
 			table.close();
 			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
 		}
@@ -288,14 +291,13 @@ public class MemoryHashTableTest {
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertTrue("" + i, prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
 			table.close();
 			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
 		}
@@ -343,8 +345,321 @@ public class MemoryHashTableTest {
 			
 			table.close();
 			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Error: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRepeatedBuildAndRetrieve() {
+		try {
+			final int NUM_MEM_PAGES = SIZE * NUM_LISTS / PAGE_SIZE;
+			
+			final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
+			
+			AbstractMutableHashTable<IntList> table = new CompactingHashTable<IntList>(serializerV, comparatorV, getMemory(NUM_MEM_PAGES, PAGE_SIZE));
+			table.open();
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				try {
+					table.insert(lists[i]);
+				} catch (Exception e) {
+					throw e;
+				}
+			}
+
+
+			AbstractHashTableProber<IntList, IntList> prober = table.getProber(comparatorV, pairComparatorV);
+			IntList target = new IntList();
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				assertTrue(prober.getMatchFor(lists[i], target));
+				assertArrayEquals(lists[i].getValue(), target.getValue());
+			}
+			
+			IntList[] overwriteLists;
+			
+			for(int k = 0; k < NUM_REWRITES; k++) {
+				overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
+				// test replacing
+				IntList tempHolder = new IntList();
+				for (int i = 0; i < NUM_LISTS; i++) {
+					table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+				}
+			
+				for (int i = 0; i < NUM_LISTS; i++) {
+					assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+					assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
+				}
+			}
+			
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Error: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testProberUpdate() {
+		try {
+			final int NUM_MEM_PAGES = SIZE * NUM_LISTS / PAGE_SIZE;
+			
+			final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
+			
+			AbstractMutableHashTable<IntList> table = new CompactingHashTable<IntList>(serializerV, comparatorV, getMemory(NUM_MEM_PAGES, PAGE_SIZE));
+			table.open();
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				try {
+					table.insert(lists[i]);
+				} catch (Exception e) {
+					throw e;
+				}
+			}
+
+			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
+
+			AbstractHashTableProber<IntList, IntList> prober = table.getProber(comparatorV, pairComparatorV);
+			IntList target = new IntList();
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				assertTrue(""+i,prober.getMatchFor(lists[i], target));
+				assertArrayEquals(lists[i].getValue(), target.getValue());
+				prober.updateMatch(overwriteLists[i]);
+			}
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
+			}
+			
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Error: " + e.getMessage());
 		}
-		catch (Exception e) {
+	}
+	
+	@Test
+	public void testResize() {
+		try {
+			final int NUM_MEM_PAGES = 30 * NUM_PAIRS / PAGE_SIZE;
+			final IntPair[] pairs = getRandomizedIntPairs(NUM_PAIRS, rnd);
+			
+			List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+			CompactingHashTable<IntPair> table = new CompactingHashTable<IntPair>(serializer, comparator, memory);
+			table.open();
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				table.insert(pairs[i]);
+			}
+	
+			AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
+			IntPair target = new IntPair();
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+			Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES + ADDITIONAL_MEM, table.getFreeMemory().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Error: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDoubleResize() {
+		try {
+			final int NUM_MEM_PAGES = 30 * NUM_PAIRS / PAGE_SIZE;
+			final IntPair[] pairs = getRandomizedIntPairs(NUM_PAIRS, rnd);
+			
+			List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+			CompactingHashTable<IntPair> table = new CompactingHashTable<IntPair>(serializer, comparator, memory);
+			table.open();
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				table.insert(pairs[i]);
+			}
+	
+			AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
+			IntPair target = new IntPair();
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+			Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+			b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+						
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+						
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES + ADDITIONAL_MEM + ADDITIONAL_MEM, table.getFreeMemory().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Error: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTripleResize() {
+		try {
+			final int NUM_MEM_PAGES = 30 * NUM_PAIRS / PAGE_SIZE;
+			final IntPair[] pairs = getRandomizedIntPairs(NUM_PAIRS, rnd);
+			
+			List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+			CompactingHashTable<IntPair> table = new CompactingHashTable<IntPair>(serializer, comparator, memory);
+			table.open();
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				table.insert(pairs[i]);
+			}
+	
+			AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
+			IntPair target = new IntPair();
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+			Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+			
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+			b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+						
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(2*ADDITIONAL_MEM, PAGE_SIZE));
+			b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+									
+			for (int i = 0; i < NUM_PAIRS; i++) {
+				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertEquals(pairs[i].getValue(), target.getValue());
+			}
+						
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES + 4*ADDITIONAL_MEM, table.getFreeMemory().size());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Error: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testResizeWithCompaction(){
+		try {
+			final int NUM_MEM_PAGES = (SIZE * NUM_LISTS / PAGE_SIZE);
+			
+			final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
+			
+			List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+			CompactingHashTable<IntList> table = new CompactingHashTable<IntList>(serializerV, comparatorV, memory);
+			table.open();
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				try {
+					table.insert(lists[i]);
+				} catch (Exception e) {
+					throw e;
+				}
+			}
+
+			AbstractHashTableProber<IntList, IntList> prober = table.getProber(comparatorV, pairComparatorV);
+			IntList target = new IntList();
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				assertTrue(prober.getMatchFor(lists[i], target));
+				assertArrayEquals(lists[i].getValue(), target.getValue());
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+			Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());
+						
+			for (int i = 0; i < NUM_LISTS; i++) {
+				assertTrue(prober.getMatchFor(lists[i], target));
+				assertArrayEquals(lists[i].getValue(), target.getValue());
+			}
+			
+			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
+			
+			// test replacing
+			IntList tempHolder = new IntList();
+			for (int i = 0; i < NUM_LISTS; i++) {
+				table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+			}
+			
+			Field list = Whitebox.getField(CompactingHashTable.class, "partitions");
+			@SuppressWarnings("unchecked")
+			ArrayList<InMemoryPartition<IntList>> partitions = (ArrayList<InMemoryPartition<IntList>>) list.get(table);
+			int numPartitions = partitions.size();
+			for(int i = 0; i < numPartitions; i++) {
+				Whitebox.invokeMethod(table, "compactPartition", i);
+			}
+			
+			// make sure there is enough memory for resize
+			memory.addAll(getMemory(2*ADDITIONAL_MEM, PAGE_SIZE));
+			b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+			assertTrue(b.booleanValue());									
+			
+			for (int i = 0; i < NUM_LISTS; i++) {
+				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
+			}
+			
+			table.close();
+			assertEquals("Memory lost", NUM_MEM_PAGES + 3*ADDITIONAL_MEM, table.getFreeMemory().size());
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
 		}
@@ -352,7 +667,6 @@ public class MemoryHashTableTest {
 	
 	@Test
 	public void testVariableLengthStringBuildAndRetrieve() {
-		
 		try {
 			final int NUM_MEM_PAGES = 40 * NUM_PAIRS / PAGE_SIZE;
 			
@@ -364,13 +678,6 @@ public class MemoryHashTableTest {
 
 			MutableObjectIterator<StringPair> updateTester = new UniformStringPairGenerator(NUM_PAIRS, 1, false);
 			
-			//long start = 0L;
-			//long end = 0L;
-			
-			//long first = System.currentTimeMillis();
-			
-			//System.out.println("Creating and filling CompactingHashMap...");
-			//start = System.currentTimeMillis();
 			AbstractMutableHashTable<StringPair> table = new CompactingHashTable<StringPair>(serializerS, comparatorS, getMemory(NUM_MEM_PAGES, PAGE_SIZE));
 			table.open();
 			
@@ -378,47 +685,27 @@ public class MemoryHashTableTest {
 			while(buildInput.next(target) != null) {
 				table.insert(target);
 			}
-			//end = System.currentTimeMillis();
-			//System.out.println("HashMap ready. Time: " + (end-start) + " ms");
-			
-			//System.out.println("Starting first probing run...");
-			//start = System.currentTimeMillis();
 
 			AbstractHashTableProber<StringPair, StringPair> prober = table.getProber(comparatorS, pairComparatorS);
 			StringPair temp = new StringPair();
 			while(probeTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertTrue("" + target.getKey(), prober.getMatchFor(target, temp));
 				assertEquals(temp.getValue(), target.getValue());
 			}
-			//end = System.currentTimeMillis();
-			//System.out.println("Probing done. Time: " + (end-start) + " ms");
 			
-			//System.out.println("Starting update...");
-			//start = System.currentTimeMillis();
 			while(updater.next(target) != null) {
 				target.setValue(target.getValue());
 				table.insertOrReplaceRecord(target, temp);
 			}
-			//end = System.currentTimeMillis();
-			//System.out.println("Update done. Time: " + (end-start) + " ms");
 			
-			//System.out.println("Starting second probing run...");
-			//start = System.currentTimeMillis();
 			while (updateTester.next(target) != null) {
 				assertTrue(prober.getMatchFor(target, temp));
 				assertEquals(target.getValue(), temp.getValue());
 			}
-			//end = System.currentTimeMillis();
-			//System.out.println("Probing done. Time: " + (end-start) + " ms");
 			
 			table.close();
-			
-			//end = System.currentTimeMillis();
-			//System.out.println("Overall time: " + (end-first) + " ms");
-			
 			assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
 		}


Mime
View raw message