flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] flink git commit: [FLINK-2085] [runtime] Add an option to let the MemoryManager allocate and release memory as needed.
Date Tue, 26 May 2015 14:23:19 GMT
[FLINK-2085] [runtime] Add an option to let the MemoryManager allocate and release memory as needed.

This is an alternative mode to the current mode that pre-allocates all memory.

The default remains to pre-allocate all memory.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea60678e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea60678e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea60678e

Branch: refs/heads/master
Commit: ea60678e9ecdcf954cddf0ce6d0e0b6da01b4cbc
Parents: 924830f
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri May 22 18:35:40 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue May 26 16:22:22 2015 +0200

----------------------------------------------------------------------
 .../memorymanager/DefaultMemoryManager.java     | 138 ++++++++----
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java |   2 +-
 .../io/disk/FileChannelStreamsITCase.java       |   2 +-
 .../runtime/io/disk/FileChannelStreamsTest.java |   4 +-
 .../disk/SeekableFileChannelInputViewTest.java  |   2 +-
 .../io/disk/iomanager/IOManagerITCase.java      |   4 +-
 .../IOManagerPerformanceBenchmark.java          |   4 +-
 .../memory/DefaultMemoryManagerTest.java        | 197 -----------------
 .../memory/MemoryManagerLazyAllocationTest.java | 209 +++++++++++++++++++
 .../flink/runtime/memory/MemoryManagerTest.java | 208 ++++++++++++++++++
 .../flink/runtime/memory/MemorySegmentTest.java |   4 +-
 .../operators/drivers/TestTaskContext.java      |   2 +-
 .../NonReusingReOpenableHashTableITCase.java    |  10 +-
 .../hash/ReusingReOpenableHashTableITCase.java  |  10 +-
 .../sort/LargeRecordHandlerITCase.java          |   4 +-
 .../operators/sort/LargeRecordHandlerTest.java  |   6 +-
 ...askManagerComponentsStartupShutdownTest.java |   2 +-
 18 files changed, 539 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index 28ebe13..b041ac9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -66,16 +66,21 @@ public class DefaultMemoryManager implements MemoryManager {
 	private final int pageSizeBits;			// the number of bits that the power-of-two page size corresponds to
 	
 	private final int totalNumPages;		// The initial total size, for verification.
-	
-	private boolean isShutDown;				// flag whether the close() has already been invoked.
-
-	/**
-	 * Number of slots of the task manager
-	 */
-	private final int numberOfSlots;
 
+	/** The total size of the memory managed by this memory manager */
 	private final long memorySize;
 
+	/** Number of slots of the task manager */
+	private final int numberOfSlots;
+	
+	private final boolean isPreAllocated;
+	
+	/** The number of memory pages that have not been allocated and are available for lazy allocation */
+	private int numNonAllocatedPages;
+	
+	/** flag whether the close() has already been invoked */
+	private boolean isShutDown;
+
 	// ------------------------------------------------------------------------
 	// Constructors / Destructors
 	// ------------------------------------------------------------------------
@@ -87,7 +92,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * @param numberOfSlots The number of slots of the task manager.
 	 */
 	public DefaultMemoryManager(long memorySize, int numberOfSlots) {
-		this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
+		this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE, true);
 	}
 
 	/**
@@ -96,8 +101,10 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * @param memorySize The total size of the memory to be managed by this memory manager.
 	 * @param numberOfSlots The number of slots of the task manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
+	 * @param preAllocateMemory True, if the memory manaber should immediately allocate all memory, false
+	 *                          if it should allocate and release the memory as needed.
 	 */
-	public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {
+	public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize, boolean preAllocateMemory) {
 		// sanity checks
 		if (memorySize <= 0) {
 			throw new IllegalArgumentException("Size of total memory must be positive.");
@@ -132,11 +139,17 @@ public class DefaultMemoryManager implements MemoryManager {
 		this.freeSegments = new ArrayDeque<byte[]>(this.totalNumPages);
 		this.allocatedSegments = new HashMap<AbstractInvokable, Set<DefaultMemorySegment>>();
 
+		this.isPreAllocated = preAllocateMemory;
 		
-		// add the full chunks
-		for (int i = 0; i < this.totalNumPages; i++) {
-			// allocate memory of the specified size
-			this.freeSegments.add(new byte[this.pageSize]);
+		if (preAllocateMemory) {
+			// add the full chunks
+			for (int i = 0; i < this.totalNumPages; i++) {
+				// allocate memory of the specified size
+				this.freeSegments.add(new byte[this.pageSize]);
+			}
+		}
+		else {
+			this.numNonAllocatedPages = this.totalNumPages;
 		}
 	}
 
@@ -147,12 +160,14 @@ public class DefaultMemoryManager implements MemoryManager {
 		{
 			if (!this.isShutDown) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Shutting down MemoryManager instance " + toString());
+					LOG.debug("Shutting down MemoryManager instance " + this);
 				}
 	
 				// mark as shutdown and release memory
 				this.isShutDown = true;
+				
 				this.freeSegments.clear();
+				this.numNonAllocatedPages = 0;
 				
 				// go over all allocated segments and release them
 				for (Set<DefaultMemorySegment> segments : this.allocatedSegments.values()) {
@@ -173,7 +188,9 @@ public class DefaultMemoryManager implements MemoryManager {
 	@Override
 	public boolean verifyEmpty() {
 		synchronized (this.lock) {
-			return this.freeSegments.size() == this.totalNumPages;
+			return isPreAllocated ?
+					this.freeSegments.size() == this.totalNumPages :
+					this.numNonAllocatedPages == this.totalNumPages;
 		}
 	}
 
@@ -209,7 +226,9 @@ public class DefaultMemoryManager implements MemoryManager {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
 			
-			if (numPages > this.freeSegments.size()) {
+			// in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the
+			// lazy case, the 'freeSegments.size()' is zero.
+			if (numPages > (this.freeSegments.size() + numNonAllocatedPages)) {
 				throw new MemoryAllocationException("Could not allocate " + numPages + " pages. Only " + 
 					this.freeSegments.size() + " pages are remaining.");
 			}
@@ -220,11 +239,22 @@ public class DefaultMemoryManager implements MemoryManager {
 				this.allocatedSegments.put(owner, segmentsForOwner);
 			}
 			
-			for (int i = numPages; i > 0; i--) {
-				byte[] buffer = this.freeSegments.poll();
-				final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
-				target.add(segment);
-				segmentsForOwner.add(segment);
+			if (isPreAllocated) {
+				for (int i = numPages; i > 0; i--) {
+					byte[] buffer = this.freeSegments.poll();
+					final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
+					target.add(segment);
+					segmentsForOwner.add(segment);
+				}
+			}
+			else {
+				for (int i = numPages; i > 0; i--) {
+					byte[] buffer = new byte[pageSize];
+					final DefaultMemorySegment segment = new DefaultMemorySegment(owner, buffer);
+					target.add(segment);
+					segmentsForOwner.add(segment);
+				}
+				numNonAllocatedPages -= numPages;
 			}
 		}
 		// -------------------- END CRITICAL SECTION -------------------
@@ -259,14 +289,19 @@ public class DefaultMemoryManager implements MemoryManager {
 						this.allocatedSegments.remove(owner);
 					}
 				}
+
+				byte[] buffer = defSeg.destroy();
+				
+				if (isPreAllocated) {
+					// release the memory in any case
+					this.freeSegments.add(buffer);
+				}
+				else {
+					numNonAllocatedPages++;
+				}
 			}
 			catch (Throwable t) {
-				LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
-			}
-			finally {
-				// release the memory in any case
-				byte[] buffer = defSeg.destroy();
-				this.freeSegments.add(buffer);
+				throw new RuntimeException("Error removing book-keeping reference to allocated memory segment.", t);
 			}
 		}
 		// -------------------- END CRITICAL SECTION -------------------
@@ -286,8 +321,7 @@ public class DefaultMemoryManager implements MemoryManager {
 			}
 
 			// since concurrent modifications to the collection
-			// can disturb the release, we need to try potentially
-			// multiple times
+			// can disturb the release, we need to try potentially multiple times
 			boolean successfullyReleased = false;
 			do {
 				final Iterator<T> segmentsIterator = segments.iterator();
@@ -322,12 +356,20 @@ public class DefaultMemoryManager implements MemoryManager {
 									this.allocatedSegments.remove(owner);
 								}
 							}
-						} catch (Throwable t) {
-							LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
-						} finally {
+
 							// release the memory in any case
 							byte[] buffer = defSeg.destroy();
-							this.freeSegments.add(buffer);
+							
+							if (isPreAllocated) {
+								this.freeSegments.add(buffer);
+							}
+							else {
+								numNonAllocatedPages++;
+							}
+						}
+						catch (Throwable t) {
+							throw new RuntimeException(
+									"Error removing book-keeping reference to allocated memory segment.", t);
 						}
 					}
 
@@ -363,9 +405,17 @@ public class DefaultMemoryManager implements MemoryManager {
 			}
 
 			// free each segment
-			for (DefaultMemorySegment seg : segments) {
-				final byte[] buffer = seg.destroy();
-				this.freeSegments.add(buffer);
+			if (isPreAllocated) {
+				for (DefaultMemorySegment seg : segments) {
+					final byte[] buffer = seg.destroy();
+					this.freeSegments.add(buffer);
+				}
+			}
+			else {
+				for (DefaultMemorySegment seg : segments) {
+					seg.destroy();
+				}
+				numNonAllocatedPages += segments.size();
 			}
 
 			segments.clear();
@@ -387,12 +437,16 @@ public class DefaultMemoryManager implements MemoryManager {
 
 	@Override
 	public int computeNumberOfPages(double fraction) {
-		return getRelativeNumPages(fraction);
+		if (fraction <= 0 || fraction > 1) {
+			throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
+		}
+
+		return (int)(this.totalNumPages * fraction / this.numberOfSlots);
 	}
 
 	@Override
 	public long computeMemorySize(double fraction) {
-		return this.pageSize*computeNumberOfPages(fraction);
+		return this.pageSize * computeNumberOfPages(fraction);
 	}
 
 	@Override
@@ -414,14 +468,6 @@ public class DefaultMemoryManager implements MemoryManager {
 			throw new IllegalArgumentException("The given number of bytes corresponds to more than MAX_INT pages.");
 		}
 	}
-
-	private int getRelativeNumPages(double fraction){
-		if (fraction <= 0 || fraction > 1) {
-			throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
-		}
-
-		return (int)(this.totalNumPages * fraction / this.numberOfSlots);
-	}
 	
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 65ce7dc..a95b5cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1396,8 +1396,10 @@ object TaskManager {
     val memoryManager = try {
       new DefaultMemoryManager(memorySize,
                                taskManagerConfig.numberOfSlots,
-                               netConfig.networkBufferSize)
-    } catch {
+                               netConfig.networkBufferSize,
+                               true)
+    }
+    catch {
       case e: OutOfMemoryError => throw new Exception(
         "OutOfMemory error (" + e.getMessage + ") while allocating the TaskManager memory (" +
           memorySize + " bytes).", e)

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 0462b3f..f2a5e2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -78,7 +78,7 @@ public class ChannelViewsTest
 
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, true);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 85d2113..dcc1e5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -66,7 +66,7 @@ public class FileChannelStreamsITCase {
 
 	@Before
 	public void beforeTest() {
-		memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE);
+		memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE, true);
 		ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1f6899d..f9b8b38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -44,7 +44,7 @@ public class FileChannelStreamsTest {
 	public void testCloseAndDeleteOutputView() {
 		final IOManager ioManager = new IOManagerAsync();
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			
@@ -78,7 +78,7 @@ public class FileChannelStreamsTest {
 	public void testCloseAndDeleteInputView() {
 		final IOManager ioManager = new IOManagerAsync();
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index f090ef1..c071bef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -45,7 +45,7 @@ public class SeekableFileChannelInputViewTest {
 		// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
 		
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE);
+			MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 9abedb3..a0a8356 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
+import org.apache.flink.runtime.memory.MemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 
 /**
@@ -82,7 +82,7 @@ public class IOManagerITCase {
 	@SuppressWarnings("unchecked")
 	public void parallelChannelsTest() throws Exception {
 		final Random rnd = new Random(SEED);
-		final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
+		final AbstractInvokable memOwner = new MemoryManagerTest.DummyInvokable();
 		
 		FileIOChannel.ID[] ids = new FileIOChannel.ID[NUM_CHANNELS];
 		BlockChannelWriter<MemorySegment>[] writers = new BlockChannelWriter[NUM_CHANNELS];

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index c9ca9fb..c71e181 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -40,7 +40,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
+import org.apache.flink.runtime.memory.MemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.junit.After;
 import org.junit.Before;
@@ -62,7 +62,7 @@ public class IOManagerPerformanceBenchmark {
 	private static final int NUM_INTS_WRITTEN = 100000000;
 	
 	
-	private static final AbstractInvokable memoryOwner = new DefaultMemoryManagerTest.DummyInvokable();
+	private static final AbstractInvokable memoryOwner = new MemoryManagerTest.DummyInvokable();
 	
 	private DefaultMemoryManager memManager;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
deleted file mode 100644
index afd40a2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/DefaultMemoryManagerTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.memory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DefaultMemoryManagerTest
-{
-	private static final long RANDOM_SEED = 643196033469871L;
-
-	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
-
-	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-	
-	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
-
-	private DefaultMemoryManager memoryManager;
-
-	private Random random;
-
-	@Before
-	public void setUp()
-	{
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
-		this.random = new Random(RANDOM_SEED);
-	}
-
-	@After
-	public void tearDown()
-	{
-		if (!this.memoryManager.verifyEmpty()) {
-			Assert.fail("Memory manager is not complete empty and valid at the end of the test.");
-		}
-		this.memoryManager = null;
-		this.random = null;
-	}
-
-	@Test
-	public void allocateAllSingle() throws Exception
-	{
-		final AbstractInvokable mockInvoke = new DummyInvokable();
-		List<MemorySegment> segments = new ArrayList<MemorySegment>();
-		
-		try {
-			for (int i = 0; i < NUM_PAGES; i++) {
-				segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
-			}
-		} catch (MemoryAllocationException e) {
-			Assert.fail("Unable to allocate memory");
-		}
-		
-		this.memoryManager.release(segments);
-	}
-	
-	@Test
-	public void allocateAllMulti() throws Exception
-	{
-		final AbstractInvokable mockInvoke = new DummyInvokable();
-		final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-		
-		try {
-			for(int i = 0; i < NUM_PAGES / 2; i++) {
-				segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
-			}
-		} catch (MemoryAllocationException e) {
-			Assert.fail("Unable to allocate memory");
-		}
-		
-		this.memoryManager.release(segments);
-	}
-	
-	@Test
-	public void allocateMultipleOwners()
-	{
-		final int NUM_OWNERS = 17;
-	
-		try {
-			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
-			@SuppressWarnings("unchecked")
-			List<MemorySegment>[] mems = new List[NUM_OWNERS];
-			
-			for (int i = 0; i < NUM_OWNERS; i++) {
-				owners[i] = new DummyInvokable();
-				mems[i] = new ArrayList<MemorySegment>(64);
-			}
-			
-			// allocate all memory to the different owners
-			for (int i = 0; i < NUM_PAGES; i++) {
-				final int owner = this.random.nextInt(NUM_OWNERS);
-				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
-			}
-			
-			// free one owner at a time
-			for (int i = 0; i < NUM_OWNERS; i++) {
-				this.memoryManager.releaseAll(owners[i]);
-				owners[i] = null;
-				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
-				mems[i] = null;
-				
-				// check that the owner owners were not affected
-				for (int k = i+1; k < NUM_OWNERS; k++) {
-					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void allocateTooMuch()
-	{
-		try {
-			final AbstractInvokable mockInvoke = new DummyInvokable();
-			
-			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-			
-			try {
-				this.memoryManager.allocatePages(mockInvoke, 1);
-				Assert.fail("Expected MemoryAllocationException.");
-			} catch (MemoryAllocationException maex) {
-				// expected
-			}
-			
-			Assert.assertTrue("The previously allocated segments were not valid any more.",
-																	allMemorySegmentsValid(segs));
-			
-			this.memoryManager.releaseAll(mockInvoke);			
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test encountered an exception: " + e.getMessage());
-		}
-	}
-	
-	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs)
-	{
-		for (MemorySegment seg : memSegs) {
-			if (seg.isFreed()) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs)
-	{
-		for (MemorySegment seg : memSegs) {
-			if (!seg.isFreed()) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	/**
-	 * Utility class to serve as owner for the memory.
-	 */
-	public static final class DummyInvokable extends AbstractInvokable {
-		@Override
-		public void registerInputOutput() {}
-
-		@Override
-		public void invoke() throws Exception {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
new file mode 100644
index 0000000..e077d08
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the memory manager, in the mode where it pre-allocates all memory.
+ */
+public class MemoryManagerLazyAllocationTest {
+	
+	private static final long RANDOM_SEED = 643196033469871L;
+
+	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
+
+	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
+	
+	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
+
+	private DefaultMemoryManager memoryManager;
+
+	private Random random;
+
+	
+	@Before
+	public void setUp() {
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, false);
+		this.random = new Random(RANDOM_SEED);
+	}
+
+	@After
+	public void tearDown() {
+		if (!this.memoryManager.verifyEmpty()) {
+			fail("Memory manager is not complete empty and valid at the end of the test.");
+		}
+		this.memoryManager = null;
+		this.random = null;
+	}
+
+	@Test
+	public void allocateAllSingle() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for (int i = 0; i < NUM_PAGES; i++) {
+					segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
+				}
+			}
+			catch (MemoryAllocationException e) {
+				fail("Unable to allocate memory");
+			}
+			
+			for (MemorySegment seg : segments) {
+				this.memoryManager.release(seg);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateAllMulti() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			final List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for(int i = 0; i < NUM_PAGES / 2; i++) {
+					segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
+				}
+			} catch (MemoryAllocationException e) {
+				Assert.fail("Unable to allocate memory");
+			}
+			
+			this.memoryManager.release(segments);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateMultipleOwners() {
+		final int NUM_OWNERS = 17;
+	
+		try {
+			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
+			
+			@SuppressWarnings("unchecked")
+			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
+			
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				owners[i] = new DummyInvokable();
+				mems[i] = new ArrayList<MemorySegment>(64);
+			}
+			
+			// allocate all memory to the different owners
+			for (int i = 0; i < NUM_PAGES; i++) {
+				final int owner = this.random.nextInt(NUM_OWNERS);
+				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
+			}
+			
+			// free one owner at a time
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				this.memoryManager.releaseAll(owners[i]);
+				owners[i] = null;
+				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
+				mems[i] = null;
+				
+				// check that the owner owners were not affected
+				for (int k = i+1; k < NUM_OWNERS; k++) {
+					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateTooMuch() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			
+			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
+			
+			try {
+				this.memoryManager.allocatePages(mockInvoke, 1);
+				Assert.fail("Expected MemoryAllocationException.");
+			} catch (MemoryAllocationException maex) {
+				// expected
+			}
+			
+			Assert.assertTrue("The previously allocated segments were not valid any more.",
+																	allMemorySegmentsValid(segs));
+			
+			this.memoryManager.releaseAll(mockInvoke);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (!seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	/**
+	 * Utility class to serve as owner for the memory.
+	 */
+	public static final class DummyInvokable extends AbstractInvokable {
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
new file mode 100644
index 0000000..c211357
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the memory manager, in the mode where it pre-allocates all memory.
+ */
+public class MemoryManagerTest {
+	
+	private static final long RANDOM_SEED = 643196033469871L;
+
+	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
+
+	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
+	
+	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
+
+	private DefaultMemoryManager memoryManager;
+
+	private Random random;
+
+	
+	@Before
+	public void setUp() {
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
+		this.random = new Random(RANDOM_SEED);
+	}
+
+	@After
+	public void tearDown() {
+		if (!this.memoryManager.verifyEmpty()) {
+			fail("Memory manager is not complete empty and valid at the end of the test.");
+		}
+		this.memoryManager = null;
+		this.random = null;
+	}
+
+	@Test
+	public void allocateAllSingle() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for (int i = 0; i < NUM_PAGES; i++) {
+					segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
+				}
+			}
+			catch (MemoryAllocationException e) {
+				fail("Unable to allocate memory");
+			}
+			
+			this.memoryManager.release(segments);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateAllMulti() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			final List<MemorySegment> segments = new ArrayList<MemorySegment>();
+			
+			try {
+				for(int i = 0; i < NUM_PAGES / 2; i++) {
+					segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
+				}
+			} catch (MemoryAllocationException e) {
+				Assert.fail("Unable to allocate memory");
+			}
+			
+			this.memoryManager.release(segments);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateMultipleOwners() {
+		final int NUM_OWNERS = 17;
+	
+		try {
+			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
+			
+			@SuppressWarnings("unchecked")
+			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
+			
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				owners[i] = new DummyInvokable();
+				mems[i] = new ArrayList<MemorySegment>(64);
+			}
+			
+			// allocate all memory to the different owners
+			for (int i = 0; i < NUM_PAGES; i++) {
+				final int owner = this.random.nextInt(NUM_OWNERS);
+				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
+			}
+			
+			// free one owner at a time
+			for (int i = 0; i < NUM_OWNERS; i++) {
+				this.memoryManager.releaseAll(owners[i]);
+				owners[i] = null;
+				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
+				mems[i] = null;
+				
+				// check that the owner owners were not affected
+				for (int k = i+1; k < NUM_OWNERS; k++) {
+					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void allocateTooMuch() {
+		try {
+			final AbstractInvokable mockInvoke = new DummyInvokable();
+			
+			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
+			
+			try {
+				this.memoryManager.allocatePages(mockInvoke, 1);
+				Assert.fail("Expected MemoryAllocationException.");
+			} catch (MemoryAllocationException maex) {
+				// expected
+			}
+			
+			Assert.assertTrue("The previously allocated segments were not valid any more.",
+																	allMemorySegmentsValid(segs));
+			
+			this.memoryManager.releaseAll(mockInvoke);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
+		for (MemorySegment seg : memSegs) {
+			if (!seg.isFreed()) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	/**
+	 * Utility class to serve as owner for the memory.
+	 */
+	public static final class DummyInvokable extends AbstractInvokable {
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
index cefd0c5..f9adb94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentTest.java
@@ -51,8 +51,8 @@ public class MemorySegmentTest {
 	@Before
 	public void setUp() throws Exception{
 		try {
-			this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE);
-			this.segment = manager.allocatePages(new DefaultMemoryManagerTest.DummyInvokable(), 1).get(0);
+			this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, true);
+			this.segment = manager.allocatePages(new MemoryManagerTest.DummyInvokable(), 1).get(0);
 			this.random = new Random(RANDOM_SEED);
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 02bffec..b1466c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -69,7 +69,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	public TestTaskContext() {}
 	
 	public TestTaskContext(long memoryInBytes) {
-		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024);
+		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 5012d1e..f5105bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -101,10 +101,9 @@ public class NonReusingReOpenableHashTableITCase {
 	private TypeComparator<Record> recordProbeSideComparator;
 	private TypePairComparator<Record, Record> pactRecordComparator;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.recordSerializer = RecordSerializer.get();
 
 		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
@@ -121,13 +120,12 @@ public class NonReusingReOpenableHashTableITCase {
 		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
 		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index fd2f906..7172887 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -100,10 +100,9 @@ public class ReusingReOpenableHashTableITCase {
 	private TypeComparator<Record> recordProbeSideComparator;
 	private TypePairComparator<Record, Record> pactRecordComparator;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.recordSerializer = RecordSerializer.get();
 		
 		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
@@ -120,13 +119,12 @@ public class ReusingReOpenableHashTableITCase {
 		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
 		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 498cb61..429062b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -62,7 +62,7 @@ public class LargeRecordHandlerITCase {
 		final int NUM_RECORDS = 10;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -203,7 +203,7 @@ public class LargeRecordHandlerITCase {
 		FileIOChannel.ID channel = null;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index 6eb736f..423ff9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -49,7 +49,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_PAGES = 50;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
 			
@@ -101,7 +101,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_RECORDS = 25000;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -189,7 +189,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_RECORDS = 25000;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea60678e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index dca3c58..7d4994d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -85,7 +85,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 
-			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE);
+			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf);
 			final int numberOfSlots = 1;


Mime
View raw message