flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [FLINK-2076] [runtime] Fix memory leakage in MutableHashTable
Date Thu, 04 Jun 2015 11:28:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0081fb2ef -> 1559701f4


[FLINK-2076] [runtime] Fix memory leakage in MutableHashTable

This closes #751


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1559701f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1559701f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1559701f

Branch: refs/heads/master
Commit: 1559701f451e76944ba016f942c22fa8a022bcde
Parents: 7411343
Author: Chiwan Park <chiwanpark@icloud.com>
Authored: Sun May 31 03:27:17 2015 +0900
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jun 4 13:27:22 2015 +0200

----------------------------------------------------------------------
 .../operators/hash/MutableHashTable.java        |  6 +-
 .../runtime/operators/hash/HashTableITCase.java | 76 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1559701f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index c0be611..21d67a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -513,7 +513,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource
{
 
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memory.add(getNextBuffer());
-			memory.add(getNextBuffer());
+			MemorySegment nextBuffer = getNextBuffer();
+			if (nextBuffer != null) {
+				memory.add(nextBuffer);
+			}
 
 			ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
 				returnQueue, memory, this.availableMemory, this.probeSideSerializer, p.getProbeSideBlockCount());
@@ -652,6 +655,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource
{
 				throw new RuntimeException("Hashtable closing was interrupted");
 			}
 		}
+		this.writeBehindBuffersAvailable = 0;
 	}
 	
 	public void abort() {

http://git-wip-us.apache.org/repos/asf/flink/blob/1559701f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index c6e3062..233fa4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -1400,6 +1400,82 @@ public class HashTableITCase {
 		
 		this.memManager.release(join.getFreedMemory());
 	}
+
+	/*
+	 * This test is same as `testInMemoryReOpen()` but only number of keys and pages are different.
This test
+	 * validates a bug fix MutableHashTable memory leakage with small memory segments.
+	 */
+	@Test
+	public void testInMemoryReOpenWithSmallMemory() throws Exception {
+		final int NUM_KEYS = 10000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+
+		// create a build input that gives 30000 pairs with 3 values sharing the same key
+		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS,
BUILD_VALS_PER_KEY, false);
+
+		// create a probe input that gives 100000 pairs with 10 values sharing a key
+		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS,
PROBE_VALS_PER_KEY, true);
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair,
IntPair>(
+				this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+				this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+				memSegments, ioManager);
+		join.open(buildInput, probeInput);
+
+		final IntPair recordReuse = new IntPair();
+		int numRecordsInJoinResult = 0;
+
+		while (join.nextRecord()) {
+			HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY
* PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+		join.close();
+
+		// ----------------------------------------------------------------------------------------
+		// recreate the inputs
+
+		// create a build input that gives 30000 pairs with 3 values sharing the same key
+		buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+		// create a probe input that gives 100000 pairs with 10 values sharing a key
+		probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+		join.open(buildInput, probeInput);
+
+		numRecordsInJoinResult = 0;
+
+		while (join.nextRecord()) {
+			HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY
* PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+		join.close();
+
+		// ----------------------------------------------------------------------------------------
+
+		this.memManager.release(join.getFreedMemory());
+	}
 	
 	// ============================================================================================
 	


Mime
View raw message