flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-2964] [runtime] Fix broken spilling of MutableHashTable
Date Wed, 04 Nov 2015 10:33:22 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10 e83bf74f1 -> f1bf28237


[FLINK-2964] [runtime] Fix broken spilling of MutableHashTable

The HashPartition did not count properly the number of occupied memory
segments, because it excluded the memory segments of the
BuildSideBuffer. That caused the MutableHashTable to fail when trying to
spill a partition which did not have any overflow segments. This PR
fixes the problem by also counting the memory segments of the
BuildSideBuffer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1bf2823
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1bf2823
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1bf2823

Branch: refs/heads/release-0.10
Commit: f1bf28237540987d99592d90dbba978460c2a97e
Parents: e83bf74
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Nov 4 02:17:12 2015 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Nov 4 11:32:58 2015 +0100

----------------------------------------------------------------------
 .../runtime/operators/hash/HashPartition.java   |   8 +-
 .../runtime/operators/hash/HashTableTest.java   | 124 +++++++++++++++++++
 2 files changed, 131 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1bf2823/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 32fd74a..97bef4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -207,7 +207,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView
implements See
 	 */
 	public int getNumOccupiedMemorySegments() {
 		// either the number of memory segments, or one for spilling
-		final int numPartitionBuffers = this.partitionBuffers != null ? this.partitionBuffers.length
: 1;
+		final int numPartitionBuffers = this.partitionBuffers != null ?
+			this.partitionBuffers.length : this.buildSideWriteBuffer.getNumOccupiedMemorySegments();
 		return numPartitionBuffers + numOverflowSegments;
 	}
 	
@@ -541,6 +542,11 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView
implements See
 		int getBlockCount() {
 			return this.currentBlockNumber + 1;
 		}
+
+		int getNumOccupiedMemorySegments() {
+			// return the current segment + all filled segments
+			return this.targetList.size() + 1;
+		}
 		
 		int spill(BlockChannelWriter<MemorySegment> writer) throws IOException {
 			this.writer = writer;

http://git-wip-us.apache.org/repos/asf/flink/blob/f1bf2823/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 92adc2a..6ef6d47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.operators.hash;
 
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ByteValueSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
@@ -37,10 +39,13 @@ import org.apache.flink.types.ByteValue;
 import org.apache.flink.util.MutableObjectIterator;
 
 import org.junit.Test;
+import org.junit.Assert;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.*;
@@ -191,6 +196,64 @@ public class HashTableTest {
 			ioMan.shutdown();
 		}
 	}
+
+	/**
+	 * Tests that the MutableHashTable spills its partitions when creating the initial table
+	 * without overflow segments in the partitions. This means that the records are large.
+	 */
+	@Test
+	public void testSpillingWhenBuildingTableWithoutOverflow() throws Exception {
+		final IOManager ioMan = new IOManagerAsync();
+
+		final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE;
+		final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true);
+		final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true);
+
+		@SuppressWarnings("unchecked")
+		final TypePairComparator<byte[], byte[]> pairComparator = new GenericPairComparator<>(
+			new BytePrimitiveArrayComparator(true), new BytePrimitiveArrayComparator(true));
+
+		final int pageSize = 128;
+		final int numSegments = 33;
+
+		List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+		MutableHashTable<byte[], byte[]> table = new MutableHashTable<byte[], byte[]>(
+			serializer,
+			serializer,
+			buildComparator,
+			probeComparator,
+			pairComparator,
+			memory,
+			ioMan,
+			1,
+			false);
+
+		int numElements = 9;
+
+		table.open(
+			new CombiningIterator<byte[]>(
+				new ByteArrayIterator(numElements, 128,(byte) 0),
+				new ByteArrayIterator(numElements, 128,(byte) 1)),
+			new CombiningIterator<byte[]>(
+				new ByteArrayIterator(1, 128,(byte) 0),
+				new ByteArrayIterator(1, 128,(byte) 1)));
+
+		while(table.nextRecord()) {
+			MutableHashTable.HashBucketIterator<byte[], byte[]> iterator = table.getBuildSideIterator();
+
+			int counter = 0;
+
+			while(iterator.next() != null) {
+				counter++;
+			}
+
+			// check that we retrieve all our elements
+			Assert.assertEquals(numElements, counter);
+		}
+
+		table.close();
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utilities
@@ -241,6 +304,34 @@ public class HashTableTest {
 		}
 	}
 
+	private static class ByteArrayIterator implements MutableObjectIterator<byte[]> {
+
+		private final long numRecords;
+		private long counter = 0;
+		private final byte[] arrayValue;
+
+
+		ByteArrayIterator(long numRecords, int length, byte value) {
+			this.numRecords = numRecords;
+			arrayValue = new byte[length];
+			Arrays.fill(arrayValue, value);
+		}
+
+		@Override
+		public byte[] next(byte[] array) {
+			return next();
+		}
+
+		@Override
+		public byte[] next() {
+			if (counter++ < numRecords) {
+				return arrayValue;
+			} else {
+				return null;
+			}
+		}
+	}
+
 	private static class LongIterator implements MutableObjectIterator<Long> {
 
 		private final long numRecords;
@@ -288,4 +379,37 @@ public class HashTableTest {
 			}
 		}
 	}
+
+	private static class CombiningIterator<T> implements MutableObjectIterator<T>
{
+
+		private final MutableObjectIterator<T> left;
+		private final MutableObjectIterator<T> right;
+
+		public CombiningIterator(MutableObjectIterator<T> left, MutableObjectIterator<T>
right) {
+			this.left = left;
+			this.right = right;
+		}
+
+		@Override
+		public T next(T reuse) throws IOException {
+			T value = left.next(reuse);
+
+			if (value == null) {
+				return right.next(reuse);
+			} else {
+				return value;
+			}
+		}
+
+		@Override
+		public T next() throws IOException {
+			T value = left.next();
+
+			if (value == null) {
+				return right.next();
+			} else {
+				return value;
+			}
+		}
+	}
 }


Mime
View raw message