flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant of the managed memory
Date Tue, 08 Sep 2015 18:58:54 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 62c2013..f40171a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.util.List;
@@ -25,9 +24,8 @@ import java.util.Random;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
-import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
@@ -43,15 +41,15 @@ import org.junit.Test;
 /**
  * 
  */
-public class FixedLengthRecordSorterTest
-{
+public class FixedLengthRecordSorterTest {
+	
 	private static final long SEED = 649180756312423613L;
 
 	private static final int MEMORY_SIZE = 1024 * 1024 * 64;
 	
 	private static final int MEMORY_PAGE_SIZE = 32 * 1024; 
 
-	private DefaultMemoryManager memoryManager;
+	private MemoryManager memoryManager;
 	
 	private TypeSerializer<IntPair> serializer;
 	
@@ -60,7 +58,7 @@ public class FixedLengthRecordSorterTest
 
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, MEMORY_PAGE_SIZE);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
 		this.serializer = new IntPairSerializer();
 		this.comparator = new IntPairComparator();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 429062b..e4dbfd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -39,16 +39,18 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.FileChannelOutputView;
 import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Test;
 
 public class LargeRecordHandlerITCase {
@@ -62,7 +64,7 @@ public class LargeRecordHandlerITCase {
 		final int NUM_RECORDS = 10;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
+			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -143,9 +145,7 @@ public class LargeRecordHandlerITCase {
 			fail(e.getMessage());
 		}
 		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
+			ioMan.shutdown();
 		}
 	}
 	
@@ -155,8 +155,6 @@ public class LargeRecordHandlerITCase {
 
 		private static final byte[] BUFFER = new byte[50000000];
 		
-//		private static final byte[] BUFFER = new byte[500000000];
-		
 		static {
 			for (int i = 0; i < BUFFER.length; i++) {
 				BUFFER[i] = (byte) i;
@@ -178,9 +176,9 @@ public class LargeRecordHandlerITCase {
 		@Override
 		public void read(DataInputView in) throws IOException {
 			val = in.readInt();
-			for (int i = 0; i < BUFFER.length; i++) {
+			for (byte bufferByte : BUFFER) {
 				byte b = in.readByte();
-				assertEquals(BUFFER[i], b);
+				assertEquals(bufferByte, b);
 			}
 		}
 		
@@ -192,7 +190,7 @@ public class LargeRecordHandlerITCase {
 	}
 	
 	
-//	@Test
+	@Test
 	public void fileTest() {
 		
 		final IOManager ioMan = new IOManagerAsync();
@@ -203,7 +201,7 @@ public class LargeRecordHandlerITCase {
 		FileIOChannel.ID channel = null;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
+			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
@@ -266,12 +264,10 @@ public class LargeRecordHandlerITCase {
 			if (channel != null) {
 				try {
 					ioMan.deleteChannel(channel);
-				} catch (IOException e) {}
-			}
-			
-			if (ioMan != null) {
-				ioMan.shutdown();
+				} catch (IOException ignored) {}
 			}
+
+			ioMan.shutdown();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index 423ff9a..49e1e76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -31,10 +31,11 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
@@ -49,7 +50,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_PAGES = 50;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
+			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
 			
@@ -86,9 +87,7 @@ public class LargeRecordHandlerTest {
 			fail(e.getMessage());
 		}
 		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
+			ioMan.shutdown();
 		}
 	}
 	
@@ -101,7 +100,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_RECORDS = 25000;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
+			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -174,9 +173,7 @@ public class LargeRecordHandlerTest {
 			fail(e.getMessage());
 		}
 		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
+			ioMan.shutdown();
 		}
 	}
 	
@@ -189,7 +186,7 @@ public class LargeRecordHandlerTest {
 		final int NUM_RECORDS = 25000;
 		
 		try {
-			final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, true);
+			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
 			final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6);
@@ -264,9 +261,7 @@ public class LargeRecordHandlerTest {
 			fail(e.getMessage());
 		}
 		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
+			ioMan.shutdown();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index 6548052..7f6db00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -32,9 +32,12 @@ import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.Match;
+import org.apache.flink.runtime.operators.testutils.MatchRemovingJoiner;
+import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
@@ -45,7 +48,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 @SuppressWarnings("deprecation")
@@ -97,7 +104,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
 				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
 		pairComparator = new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2);
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
index 91609bb..7272595 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
index abead64..3e941dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
@@ -25,9 +25,8 @@ import java.util.Random;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
-import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.Key;
@@ -57,12 +56,12 @@ public class NormalizedKeySorterTest
 	
 	private static final int MEMORY_PAGE_SIZE = 32 * 1024; 
 
-	private DefaultMemoryManager memoryManager;
+	private MemoryManager memoryManager;
 
 
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, MEMORY_PAGE_SIZE);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
 	}
 
 	@After
@@ -369,8 +368,7 @@ public class NormalizedKeySorterTest
 	}
 	
 	@Test
-	public void testSortShortStringKeys() throws Exception
-	{
+	public void testSortShortStringKeys() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
@@ -418,8 +416,7 @@ public class NormalizedKeySorterTest
 	}
 	
 	@Test
-	public void testSortLongStringKeys() throws Exception
-	{
+	public void testSortLongStringKeys() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index 39316e3..d92be18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -32,9 +32,12 @@ import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.Match;
+import org.apache.flink.runtime.operators.testutils.MatchRemovingJoiner;
+import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
@@ -45,7 +48,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 @SuppressWarnings("deprecation")
@@ -97,7 +104,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
 				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
 		pairComparator = new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2);
 
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
index 779cf37..2cec393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index a085eeb..ece20ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -29,8 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.ResettablePactDriver;
@@ -100,7 +99,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 		this.perSortMem = perSortMemory;
 		this.perSortFractionMem = (double) perSortMemory / totalMem;
 		this.ioManager = new IOManagerAsync();
-		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem, 1) : null;
+		this.memManager = totalMem > 0 ? new MemoryManager(totalMem, 1) : null;
 		
 		this.inputs = new ArrayList<>();
 		this.comparators = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 12ca909..1737349 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -37,8 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.ResettablePactDriver;
@@ -104,7 +103,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 		this.perSortMem = perSortMemory;
 		this.perSortFractionMem = (double)perSortMemory/totalMem;
 		this.ioManager = new IOManagerAsync();
-		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
+		this.memManager = totalMem > 0 ? new MemoryManager(totalMem,1) : null;
 
 		this.inputs = new ArrayList<MutableObjectIterator<Record>>();
 		this.comparators = new ArrayList<TypeComparator<Record>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 5fc9bb9..972aeda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
@@ -39,8 +39,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
@@ -94,7 +93,7 @@ public class MockEnvironment implements Environment {
 		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<ResultPartitionWriter>();
 
-		this.memManager = new DefaultMemoryManager(memorySize, 1);
+		this.memManager = new MemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
 		this.bufferSize = bufferSize;
@@ -125,7 +124,7 @@ public class MockEnvironment implements Environment {
 
 				@Override
 				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-					return new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+					return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
 				}
 			});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index c04d051..4662762 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.TaskConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 9600e5a..924a16b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -29,8 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.ResettablePactDriver;
@@ -106,7 +105,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 		this.perSortMem = perSortMemory;
 		this.perSortFractionMem = (double)perSortMemory/totalMem;
 		this.ioManager = new IOManagerAsync();
-		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
+		this.memManager = totalMem > 0 ? new MemoryManager(totalMem, 1) : null;
 		this.owner = new DummyInvokable();
 
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
index cbbeca0..8fbd4f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.operators.util;
 
+import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -37,7 +40,7 @@ public class BloomFilterTest {
 		int bitsSize = BloomFilter.optimalNumOfBits(INPUT_SIZE, FALSE_POSITIVE_PROBABILITY);
 		bitsSize = bitsSize + (Long.SIZE - (bitsSize % Long.SIZE));
 		int byteSize = bitsSize >>> 3;
-		MemorySegment memorySegment = new MemorySegment(new byte[byteSize]);
+		MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
 		bloomFilter = new BloomFilter(INPUT_SIZE, byteSize);
 		bloomFilter.setBitsLocation(memorySegment, 0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 7debb08..1060e55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
@@ -94,7 +94,7 @@ public class HashVsSortMiniBenchmark {
 		this.comparator2 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
 		this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 6d743d5..7857123 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 0bb4063..0a7f08d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -37,8 +38,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -87,12 +87,12 @@ public class TaskManagerComponentsStartupShutdownTest {
 					config);
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
 					new Tuple2<Integer, Integer>(0, 0));
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 
-			final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false);
+			final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
 				TestingUtils.defaultExecutionContext(),

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5cfc2a2..cf1bfc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
 
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
index ac6744e..3631ba1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.junit.Assert;
 
 import org.apache.flink.core.memory.MemorySegment;
@@ -38,7 +38,7 @@ public class DataInputOutputSerializerTest {
 		SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
 
 		DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
-		MemorySegment segment = new MemorySegment(new byte[randomInt.length()]);
+		MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(randomInt.length());
 
 		try {
 			// empty buffer, read buffer should be empty

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 475115e..8746fbb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
+import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
 RequestLeaderSessionID}
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
@@ -53,7 +53,7 @@ import scala.language.postfixOps
 class TestingTaskManager(
     config: TaskManagerConfiguration,
     connectionInfo: InstanceConnectionInfo,
-    memoryManager: DefaultMemoryManager,
+    memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 2bad197..cabed14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -27,6 +27,7 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -336,7 +337,7 @@ public class BufferSpiller {
 							"Spilled buffer (%d bytes) is larger than page size of (%d bytes)", length, pageSize));
 				}
 
-				MemorySegment seg = new MemorySegment(new byte[pageSize]);
+				MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(pageSize);
 				
 				int segPos = 0;
 				int bytesRemaining = length;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 173e894..b78ec44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -20,7 +20,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -84,7 +85,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 		for (int i = 0; i < numInputChannels; i++) {
 			final int channelIndex = i;
 			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
-			final SerializationDelegate<Object> delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
+			final SerializationDelegate<Object> delegate = new SerializationDelegate<Object>(new MultiplexingStreamRecordSerializer<T>(serializer));
 
 			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
 			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
@@ -98,20 +99,25 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 						when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
 								true);
 						return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-					} else if (input != null && input.isStreamRecord()) {
+					}
+					else if (input != null && input.isStreamRecord()) {
 						Object inputElement = input.getStreamRecord();
-						final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
+						final Buffer buffer = new Buffer(
+								MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
 								mock(BufferRecycler.class));
+						
 						recordSerializer.setNextBuffer(buffer);
 						delegate.setInstance(inputElement);
 						recordSerializer.addRecord(delegate);
 
 						// Call getCurrentBuffer to ensure size is set
 						return recordSerializer.getCurrentBuffer();
-					} else if (input != null && input.isEvent()) {
+					}
+					else if (input != null && input.isEvent()) {
 						AbstractEvent event = input.getEvent();
 						return EventSerializer.toBuffer(event);
-					} else {
+					}
+					else {
 						synchronized (inputQueues[channelIndex]) {
 							inputQueues[channelIndex].wait();
 							return answer(invocationOnMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 678b145..a8a989b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -50,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
 		try {
 			ioMan = new IOManagerAsync();
 			
-			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE).createBufferPool(100, true);
-			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE).createBufferPool(100, true);
+			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
+			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index a95839a..d4fdc59 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -888,7 +889,10 @@ public class BarrierBufferTest {
 	private static BufferOrEvent createBuffer(int channel) {
 		// since we have no access to the contents, we need to use the size as an
 		// identifier to validate correctness here
-		Buffer buf = new Buffer(new MemorySegment(new byte[PAGE_SIZE]), FreeingBufferRecycler.INSTANCE);
+		Buffer buf = new Buffer(
+				MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
+				FreeingBufferRecycler.INSTANCE);
+		
 		buf.setSize(SIZE_COUNTER++);
 		return new BufferOrEvent(buf, channel);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index c6010d6..b9b6e5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -339,7 +339,7 @@ public class BarrierTrackerTest {
 
 	private static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(
-				new Buffer(new MemorySegment(new byte[] { 1, 2 }), FreeingBufferRecycler.INSTANCE), channel);
+				new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 355b7c8..e85eddb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -347,7 +348,7 @@ public class BufferSpillerTest {
 	}
 
 	private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
-		MemorySegment seg = new MemorySegment(new byte[PAGE_SIZE]);
+		MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
 		for (int i = 0; i < size; i++) {
 			seg.put(i, (byte) i);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index 7cc1958..45bbbda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
@@ -33,7 +33,6 @@ import org.junit.runner.RunWith;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -95,7 +94,9 @@ public class StreamRecordWriterTest {
 		when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
 			@Override
 			public Buffer answer(InvocationOnMock invocation) {
-				return new Buffer(new MemorySegment(new byte[4096]), FreeingBufferRecycler.INSTANCE);
+				return new Buffer(
+						MemorySegmentFactory.allocateUnpooledSegment(4096),
+						FreeingBufferRecycler.INSTANCE);
 			}
 		});
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9091fa7..4fec118 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -40,8 +40,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -95,7 +94,7 @@ public class StreamMockEnvironment implements Environment {
 		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<ResultPartitionWriter>();
 
-		this.memManager = new DefaultMemoryManager(memorySize, 1);
+		this.memManager = new MemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
 		this.bufferSize = bufferSize;
@@ -117,7 +116,9 @@ public class StreamMockEnvironment implements Environment {
 
 				@Override
 				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-					return new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+					return new Buffer(
+							MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+							mock(BufferRecycler.class));
 				}
 			});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 987d0bc..96dbeab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
index 889255c..b61a9b6 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
@@ -18,22 +18,19 @@
 
 package org.apache.flink.tez.runtime;
 
-
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 
 public class TezRuntimeEnvironment {
-
-	private static int DEFAULT_PAGE_SIZE = 32768;
-
+	
 	private final IOManager ioManager;
 
 	private final MemoryManager memoryManager;
 
 	public TezRuntimeEnvironment(long totalMemory) {
-		this.memoryManager = new DefaultMemoryManager(totalMemory, 1, DefaultMemoryManager.DEFAULT_PAGE_SIZE, true);
+		this.memoryManager = new MemoryManager(totalMemory, 1, MemoryManager.DEFAULT_PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
index 47fbad7..a745177 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -35,7 +35,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index 493de07..cb20e72 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.operators.hash.MutableHashTable;
@@ -183,7 +184,7 @@ public class HashTableRecordWidthCombinations {
 	private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
 		ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
 		for (int i = 0; i < numSegments; i++) {
-			list.add(new MemorySegment(new byte[segmentSize]));
+			list.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
 		}
 		return list;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index 2da29e8..c11b93c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
@@ -38,8 +37,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.util.MutableObjectIterator;
@@ -83,7 +81,7 @@ public class MassiveStringSorting {
 			BufferedReader verifyReader = null;
 			
 			try {
-				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
+				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
 					
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
@@ -173,7 +171,7 @@ public class MassiveStringSorting {
 			BufferedReader verifyReader = null;
 			
 			try {
-				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
+				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
 					
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index bc65ab9..7a484e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -37,8 +37,7 @@ import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.StringValue;
@@ -82,7 +81,7 @@ public class MassiveStringValueSorting {
 			BufferedReader verifyReader = null;
 			
 			try {
-				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
+				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
 					
 				TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class);
@@ -129,9 +128,11 @@ public class MassiveStringValueSorting {
 		}
 		finally {
 			if (input != null) {
+				//noinspection ResultOfMethodCallIgnored
 				input.delete();
 			}
 			if (sorted != null) {
+				//noinspection ResultOfMethodCallIgnored
 				sorted.delete();
 			}
 		}
@@ -173,7 +174,7 @@ public class MassiveStringValueSorting {
 			BufferedReader verifyReader = null;
 			
 			try {
-				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
+				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
 					
 				TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
@@ -251,9 +252,11 @@ public class MassiveStringValueSorting {
 		}
 		finally {
 			if (input != null) {
+				//noinspection ResultOfMethodCallIgnored
 				input.delete();
 			}
 			if (sorted != null) {
+				//noinspection ResultOfMethodCallIgnored
 				sorted.delete();
 			}
 		}
@@ -348,7 +351,9 @@ public class MassiveStringValueSorting {
 				wrt.newLine();
 			}
 		} finally {
-			wrt.close();
+			if (wrt != null) {
+				wrt.close();
+			}
 		}
 		
 		return f;
@@ -388,7 +393,9 @@ public class MassiveStringValueSorting {
 				wrt.newLine();
 			}
 		} finally {
-			wrt.close();
+			if (wrt != null) {
+				wrt.close();
+			}
 		}
 		
 		return f;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index 157aa0d..dd27eb5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -28,7 +28,7 @@ import java.io.BufferedReader
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
 import java.io.FileReader
 import org.apache.flink.util.MutableObjectIterator
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
+import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
@@ -89,7 +89,7 @@ class MassiveCaseClassSortingITCase {
           0,
           new ExecutionConfig)
         
-        val mm = new DefaultMemoryManager(1024 * 1024, 1)
+        val mm = new MemoryManager(1024 * 1024, 1)
         val ioMan = new IOManagerAsync()
         
         sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
index 21c6581..1b48612 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -18,25 +18,19 @@
 
 package org.apache.flink.api.scala.runtime
 
+import java.util
+import java.util.{ArrayList, List, Random}
+
 import org.apache.flink.api.common.ExecutionConfig
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.flink.api.scala._
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.core.memory.DataInputView
-import java.io.IOException
-import org.apache.flink.api.common.typeutils.TypeComparator
-import com.amazonaws.services.sqs.model.UnsupportedOperationException
-import org.apache.flink.core.memory.MemorySegment
-import org.apache.flink.core.memory.DataOutputView
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
+import org.apache.flink.api.scala._
+import org.apache.flink.core.memory._
+import org.apache.flink.runtime.operators.sort.{NormalizedKeySorter, QuickSort}
+
+import org.junit.Assert._
+import org.junit.Test
 import org.mockito.Mockito
-import org.apache.flink.runtime.operators.sort.NormalizedKeySorter
-import java.util.List
-import java.util.ArrayList
-import org.apache.flink.runtime.operators.sort.QuickSort
-import java.util.Random
 
 class CaseClassComparatorTest {
 
@@ -78,9 +72,9 @@ class CaseClassComparatorTest {
       
       
       val numMemSegs = 20
-      val memory : List[MemorySegment] = new ArrayList[MemorySegment](numMemSegs)
+      val memory : util.List[MemorySegment] = new util.ArrayList[MemorySegment](numMemSegs)
       for (i <- 1 to numMemSegs) {
-        memory.add(new MemorySegment(new Array[Byte](32*1024)))
+        memory.add(MemorySegmentFactory.allocateUnpooledSegment(32*1024))
       }
       
       val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index b95eb86..c9e193f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
+import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
 import org.apache.flink.yarn.Messages.StopYarnSession
 
@@ -32,7 +32,7 @@ import org.apache.flink.yarn.Messages.StopYarnSession
 class YarnTaskManager(
     config: TaskManagerConfiguration,
     connectionInfo: InstanceConnectionInfo,
-    memoryManager: DefaultMemoryManager,
+    memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cdee6f6..7f17396 100644
--- a/pom.xml
+++ b/pom.xml
@@ -803,7 +803,7 @@ under the License.
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-checkstyle-plugin</artifactId>
-				<version>2.12.1</version>
+				<version>2.16</version>
 				<executions>
 					<execution>
 						<id>validate</id>

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index a2864a0..ecedb19 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -52,7 +52,7 @@ under the License.
 		<module name="Regexp">
 			<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
 			<property name="illegalPattern" value="true"/>
-			<property name="message" value="Use Guava Checks instead of Commons Validate. Please refer coding guidelines."/>
+			<property name="message" value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
 		</module>
 		<module name="NeedBraces">
 			<property name="tokens" value="LITERAL_IF, LITERAL_ELSE"/>


Mime
View raw message