flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [06/15] flink git commit: [FLINK-1320] [core] Add an off-heap variant of the managed memory
Date Tue, 08 Sep 2015 18:58:56 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 529d3d1..f11b933 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,9 +37,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration._
-
+import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
+import org.apache.flink.runtime.memory.MemoryManager.HeapMemoryPool
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
@@ -50,14 +51,13 @@ import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, Ta
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription,
-InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
+import org.apache.flink.runtime.memory.MemoryManager 
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
@@ -136,7 +136,7 @@ class TaskManager(
   protected val askTimeout = new Timeout(config.timeout)
 
   /** The TaskManager's physical execution resources */
-  protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
+  protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize())
 
   /** Registry of all tasks currently executed by this TaskManager */
   protected val runningTasks = new java.util.HashMap[ExecutionAttemptID, Task]()
@@ -1548,7 +1548,8 @@ object TaskManager {
 
     val (taskManagerConfig : TaskManagerConfiguration,      
       netConfig: NetworkEnvironmentConfiguration,
-      connectionInfo: InstanceConnectionInfo
+      connectionInfo: InstanceConnectionInfo,
+      memType: MemoryType
     ) = parseTaskManagerConfiguration(
       configuration,
       taskManagerHostname,
@@ -1577,7 +1578,7 @@ object TaskManager {
       LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
       configuredMemory << 20 // megabytes to bytes
     }
-    else {
+    else if (memType == MemoryType.HEAP) {
       val fraction = configuration.getFloat(
         ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
         ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
@@ -1589,7 +1590,24 @@ object TaskManager {
         fraction).toLong
 
       LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
-        s"memory (${relativeMemSize >> 20} MB).")
+        s" heap memory (${relativeMemSize >> 20} MB).")
+
+      relativeMemSize
+    }
+    else {
+      val ratio = configuration.getFloat(
+        ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
+        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
+      
+      checkConfigParameter(ratio > 0.0f,
+        ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
+        "MemoryManager ratio (off-heap memory / heap size) must be larger than zero")
+      
+      val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
+      val relativeMemSize = (maxHeapSize * ratio).toLong
+
+      LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " +
+        s"managed off-heap memory (${relativeMemSize >> 20} MB).")
 
       relativeMemSize
     }
@@ -1598,16 +1616,27 @@ object TaskManager {
 
     // now start the memory manager
     val memoryManager = try {
-      new DefaultMemoryManager(
+      new MemoryManager(
         memorySize,
         taskManagerConfig.numberOfSlots,
         netConfig.networkBufferSize,
+        memType,
         preAllocateMemory)
     }
     catch {
-      case e: OutOfMemoryError => throw new Exception(
-        "OutOfMemory error (" + e.getMessage + ") while allocating the TaskManager memory (" +
-          memorySize + " bytes).", e)
+      case e: OutOfMemoryError => 
+        memType match {
+          case MemoryType.HEAP =>
+            throw new Exception(s"OutOfMemory error (${e.getMessage()})" + 
+              s" while allocating the TaskManager heap memory (${memorySize} bytes).", e)
+            
+          case MemoryType.OFF_HEAP =>
+            throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
+              s" while allocating the TaskManager off-heap memory (${memorySize} bytes). " +
+              s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
+            
+          case _ => throw e
+        }
     }
 
     // start the I/O manager last, it will create some temp directories.
@@ -1692,7 +1721,8 @@ object TaskManager {
       localTaskManagerCommunication: Boolean)
     : (TaskManagerConfiguration,
      NetworkEnvironmentConfiguration,
-     InstanceConnectionInfo) = {
+     InstanceConnectionInfo,
+     MemoryType) = {
 
     // ------- read values from the config and check them ---------
     //                      (a lot of them)
@@ -1738,9 +1768,9 @@ object TaskManager {
     val pageSize: Int =
       if (pageSizeNew != -1) {
         // new page size has been configured
-        checkConfigParameter(pageSizeNew >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeNew,
+        checkConfigParameter(pageSizeNew >= MemoryManager.MIN_PAGE_SIZE, pageSizeNew,
           ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-          "Minimum memory segment size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+          "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)
 
         checkConfigParameter(MathUtils.isPowerOf2(pageSizeNew), pageSizeNew,
           ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
@@ -1754,9 +1784,9 @@ object TaskManager {
       }
       else {
         // old page size has been configured
-        checkConfigParameter(pageSizeOld >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSizeOld,
+        checkConfigParameter(pageSizeOld >= MemoryManager.MIN_PAGE_SIZE, pageSizeOld,
           ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-          "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+          "Minimum buffer size is " + MemoryManager.MIN_PAGE_SIZE)
 
         checkConfigParameter(MathUtils.isPowerOf2(pageSizeOld), pageSizeOld,
           ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
@@ -1765,6 +1795,35 @@ object TaskManager {
         pageSizeOld
       }
     
+    // check whether we use heap or off-heap memory
+    val memType: MemoryType = 
+      if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+        MemoryType.OFF_HEAP
+      } else {
+        MemoryType.HEAP
+      }
+    
+    // initialize the memory segment factory accordingly
+    memType match {
+      case MemoryType.HEAP =>
+        if (!MemorySegmentFactory.isInitialized()) {
+          MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY)
+        }
+        else if (MemorySegmentFactory.getFactory() != HeapMemorySegment.FACTORY) {
+          throw new Exception("Memory type is set to heap memory, but memory segment " +
+            "factory has been initialized for off-heap memory segments")
+        }
+
+      case MemoryType.OFF_HEAP =>
+        if (!MemorySegmentFactory.isInitialized()) {
+          MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY)
+        }
+        else if (MemorySegmentFactory.getFactory() != HybridMemorySegment.FACTORY) {
+          throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+            "factory has been initialized for heap memory segments")
+        }
+    }
+    
     val tmpDirs = configuration.getString(
       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
@@ -1783,7 +1842,8 @@ object TaskManager {
     }
 
     // Default spill I/O mode for intermediate results
-    val syncOrAsync = configuration.getString(ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+    val syncOrAsync = configuration.getString(
+      ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
       ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
@@ -1791,6 +1851,7 @@ object TaskManager {
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
+      memType,
       ioMode,
       nettyConfig)
 
@@ -1834,7 +1895,7 @@ object TaskManager {
       slots,
       configuration)
 
-    (taskManagerConfig, networkConfig, connectionInfo)
+    (taskManagerConfig, networkConfig, connectionInfo, memType)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index f2a5e2d..a44916a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.io.disk;
 import java.io.EOFException;
 import java.util.List;
 
+import org.apache.flink.core.memory.MemoryType;
 import org.junit.Assert;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
@@ -32,8 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.Key;
@@ -78,7 +78,7 @@ public class ChannelViewsTest
 
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, true);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index dcc1e5f..5deb50e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.core.memory.MemoryType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -29,8 +30,7 @@ import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.PairGenerator;
 import org.apache.flink.runtime.operators.testutils.PairGenerator.KeyMode;
@@ -66,7 +66,8 @@ public class FileChannelStreamsITCase {
 
 	@Before
 	public void beforeTest() {
-		memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE, true);
+		memManager = new MemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1,
+				MEMORY_PAGE_SIZE, MemoryType.HEAP, true);
 		ioManager = new IOManagerAsync();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index f9b8b38..1c2b3de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -26,13 +26,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.StringValue;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public class FileChannelStreamsTest {
 	public void testCloseAndDeleteOutputView() {
 		final IOManager ioManager = new IOManagerAsync();
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
+			MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			
@@ -78,7 +78,7 @@ public class FileChannelStreamsTest {
 	public void testCloseAndDeleteInputView() {
 		final IOManager ioManager = new IOManagerAsync();
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024, true);
+			MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index c071bef..4c6a2b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -25,12 +25,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.junit.Test;
 
@@ -45,7 +45,7 @@ public class SeekableFileChannelInputViewTest {
 		// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
 		
 		try {
-			MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, true);
+			MemoryManager memMan = new MemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
 			
@@ -71,7 +71,7 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek to the middle of the 3rd page
 			int i = 2 * PAGE_SIZE + PAGE_SIZE / 4;
@@ -82,7 +82,7 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek to the end
 			i = 120000 - 4;
@@ -93,7 +93,7 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek to the beginning
 			i = 0;
@@ -104,7 +104,7 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek to after a page
 			i = PAGE_SIZE;
@@ -115,7 +115,7 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek to after a page
 			i = 3 * PAGE_SIZE;
@@ -126,7 +126,7 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek to the end
 			i = NUM_RECORDS;
@@ -134,17 +134,17 @@ public class SeekableFileChannelInputViewTest {
 			try {
 				in.readInt();
 				fail("should throw EOF exception");
-			} catch (EOFException e) {}
+			} catch (EOFException ignored) {}
 			
 			// seek out of bounds
 			try {
 				in.seek(-10);
 				fail("should throw an exception");
-			} catch (IllegalArgumentException e) {}
+			} catch (IllegalArgumentException ignored) {}
 			try {
 				in.seek(NUM_RECORDS + 1);
 				fail("should throw an exception");
-			} catch (IllegalArgumentException e) {}
+			} catch (IllegalArgumentException ignored) {}
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 6a9a20a..0b1e0c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk;
 
 import org.apache.flink.core.memory.DataInputView;
@@ -24,9 +23,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.memory.ListMemorySegmentSource;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
@@ -34,6 +32,7 @@ import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode
 import org.apache.flink.runtime.operators.testutils.TestData.Key;
 import org.apache.flink.runtime.operators.testutils.TestData.Value;
 import org.apache.flink.types.Record;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -42,7 +41,6 @@ import org.junit.Test;
 import java.io.EOFException;
 import java.util.ArrayList;
 
-
 public class SpillingBufferTest {
 	
 	private static final long SEED = 649180756312423613L;
@@ -69,7 +67,7 @@ public class SpillingBufferTest {
 
 	@Before
 	public void beforeTest() {
-		memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		ioManager = new IOManagerAsync();
 	}
 
@@ -91,8 +89,7 @@ public class SpillingBufferTest {
 	// --------------------------------------------------------------------------------------------
 	
 	@Test
-	public void testWriteReadInMemory() throws Exception
-	{
+	public void testWriteReadInMemory() throws Exception {
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		
 		// create the writer output view
@@ -150,9 +147,9 @@ public class SpillingBufferTest {
 	}
 	
 	@Test
-	public void testWriteReadTooMuchInMemory() throws Exception
-	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	public void testWriteReadTooMuchInMemory() throws Exception {
+		final TestData.Generator generator = new TestData.Generator(
+				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -217,9 +214,9 @@ public class SpillingBufferTest {
 	// --------------------------------------------------------------------------------------------
 	
 	@Test
-	public void testWriteReadExternal() throws Exception
-	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	public void testWriteReadExternal() throws Exception {
+		final TestData.Generator generator = new TestData.Generator(
+				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -276,9 +273,9 @@ public class SpillingBufferTest {
 	}
 
 	@Test
-	public void testWriteReadTooMuchExternal() throws Exception
-	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	public void testWriteReadTooMuchExternal() throws Exception {
+		final TestData.Generator generator = new TestData.Generator(
+				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index 49e93c6..a471e66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 import org.junit.Test;
@@ -274,7 +276,7 @@ public class AsynchronousFileIOChannelTest {
 		try {
 
 			final int NUM_BLOCKS = 100;
-			final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+			final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
 
 			final AtomicInteger callbackCounter = new AtomicInteger();
 			final AtomicBoolean exceptionOccurred = new AtomicBoolean();
@@ -336,7 +338,7 @@ public class AsynchronousFileIOChannelTest {
 
 	private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
 		try {
-			MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+			MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
 			FileIOChannel.ID channelId = ioMan.createChannel();
 
 			BlockChannelWriterWithCallback<MemorySegment> writer = new AsynchronousBlockWriterWithCallback(channelId,
@@ -371,7 +373,7 @@ public class AsynchronousFileIOChannelTest {
 			finally {
 				try {
 					writer.closeAndDelete();
-				} catch (Throwable t) {}
+				} catch (Throwable ignored) {}
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 294a6e6..c1bd465 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
@@ -146,7 +147,9 @@ public class BufferFileWriterFileSegmentReaderTest {
 
 			fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
 
-			currentNumber = verifyBufferFilledWithAscendingNumbers(new Buffer(new MemorySegment(buffer.array()), BUFFER_RECYCLER), currentNumber, fileSegment.getLength());
+			currentNumber = verifyBufferFilledWithAscendingNumbers(
+					new Buffer(MemorySegmentFactory.wrap(buffer.array()), BUFFER_RECYCLER), 
+					currentNumber, fileSegment.getLength());
 		}
 
 		reader.close();
@@ -169,7 +172,7 @@ public class BufferFileWriterFileSegmentReaderTest {
 	}
 
 	private Buffer createBuffer() {
-		return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
 	}
 
 	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index b0c702a..24d2864 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -194,7 +196,7 @@ public class BufferFileWriterReaderTest {
 	}
 
 	private Buffer createBuffer() {
-		return new Buffer(new MemorySegment(new byte[BUFFER_SIZE]), BUFFER_RECYCLER);
+		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
 	}
 
 	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index 435588f..4656d56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,7 +63,7 @@ public class IOManagerAsyncTest {
 			final FileIOChannel.ID channelID = this.ioManager.createChannel();
 			final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channelID);
 			
-			MemorySegment memSeg = new MemorySegment(new byte[32 * 1024]);
+			MemorySegment memSeg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
 			
 			for (int i = 0; i < NUM_IOS; i++) {
 				for (int pos = 0; pos < memSeg.size(); pos += 4) {
@@ -103,7 +104,7 @@ public class IOManagerAsyncTest {
 		try {
 			final List<MemorySegment> memSegs = new ArrayList<MemorySegment>();
 			for (int i = 0; i < NUM_SEGS; i++) {
-				memSegs.add(new MemorySegment(new byte[32 * 1024]));
+				memSegs.add(MemorySegmentFactory.allocateUnpooledSegment(32 * 1024));
 			}
 			
 			final FileIOChannel.ID channelID = this.ioManager.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 52908d3..6c25117 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -35,7 +35,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 
 /**
  * Integration test case for the I/O manager.
@@ -54,11 +54,11 @@ public class IOManagerITCase {
 
 	private IOManager ioManager;
 
-	private DefaultMemoryManager memoryManager;
+	private MemoryManager memoryManager;
 
 	@Before
 	public void beforeTest() {
-		memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		ioManager = new IOManagerAsync();
 	}
 
@@ -209,7 +209,7 @@ public class IOManagerITCase {
 		}
 	}
 	
-	private static final int skewedSample(Random rnd, int max) {
+	private static int skewedSample(Random rnd, int max) {
 		double uniform = rnd.nextDouble();
 		double var = Math.pow(uniform, 8.0);
 		double pareto = 0.2 / var;

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index 3bdc9bd..fd02623 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -42,7 +42,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.junit.After;
 import org.junit.Before;
@@ -66,14 +66,14 @@ public class IOManagerPerformanceBenchmark {
 	
 	private static final AbstractInvokable memoryOwner = new DummyInvokable();
 	
-	private DefaultMemoryManager memManager;
+	private MemoryManager memManager;
 	
 	private IOManager ioManager;
 	
 	
 	@Before
 	public void startup() {
-		memManager = new DefaultMemoryManager(MEMORY_SIZE,1);
+		memManager = new MemoryManager(MEMORY_SIZE, 1);
 		ioManager = new IOManagerAsync();
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index d657ebf..420199c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network;
 import static org.junit.Assert.*;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -55,7 +56,8 @@ public class NetworkEnvironmentTest {
 		try {
 			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
 			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
-					NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
+					NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
+					IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
 					new Tuple2<Integer, Integer>(0, 0));
 
 			NetworkEnvironment env = new NetworkEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index e7d0524..8455402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -16,20 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
 import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.runtime.io.network.api.serialization.types.Util;
-import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
-import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
 import org.junit.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -375,7 +378,7 @@ public class PagedViewsTest {
 		private final int segmentSize;
 
 		private TestOutputView(int segmentSize) {
-			super(new MemorySegment(new byte[segmentSize]), segmentSize, 0);
+			super(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), segmentSize, 0);
 
 			this.segmentSize = segmentSize;
 		}
@@ -383,7 +386,7 @@ public class PagedViewsTest {
 		@Override
 		protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException {
 			segments.add(new SegmentWithPosition(current, positionInCurrent));
-			return new MemorySegment(new byte[segmentSize]);
+			return MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
 		}
 
 		public void close() {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index cd6d580..819a94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
 import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.runtime.io.network.api.serialization.types.Util;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -125,7 +127,7 @@ public class SpanningRecordSerializationTest {
 	{
 		final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), mock(BufferRecycler.class));
+		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
 
 		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index 50d3639..b7bcb3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
 import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.runtime.io.network.api.serialization.types.Util;
@@ -41,7 +42,7 @@ public class SpanningRecordSerializerTest {
 		final int SEGMENT_SIZE = 16;
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
 
 		Assert.assertFalse(serializer.hasData());
@@ -75,7 +76,7 @@ public class SpanningRecordSerializerTest {
 		final int SEGMENT_SIZE = 11;
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 
 		try {
 			Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
@@ -201,7 +202,7 @@ public class SpanningRecordSerializerTest {
 		final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), mock(BufferRecycler.class));
+		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
 
 		// -------------------------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 9e10582..f8cd28f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
@@ -25,9 +26,11 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -41,14 +44,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -158,7 +159,7 @@ public class RecordWriterTest {
 		BufferPool bufferPool = null;
 
 		try {
-			buffers = new NetworkBufferPool(1, 1024);
+			buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
 			bufferPool = spy(buffers.createBufferPool(1, true));
 
 			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 28862e8..0ac84dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.core.memory.MemoryType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,7 +41,7 @@ public class BufferPoolFactoryTest {
 
 	@Before
 	public void setupNetworkBufferPool() {
-		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index 734dcfb..fd11d02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -32,7 +33,7 @@ public class BufferTest {
 
 	@Test
 	public void testSetGetSize() {
-		final MemorySegment segment = new MemorySegment(new byte[1024]);
+		final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
 		final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
 
 		Buffer buffer = new Buffer(segment, recycler);
@@ -58,7 +59,7 @@ public class BufferTest {
 
 	@Test
 	public void testgetNioBufferThreadSafe() {
-		final MemorySegment segment = new MemorySegment(new byte[1024]);
+		final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
 		final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
 
 		Buffer buffer = new Buffer(segment, recycler);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index e8e9ec8..93731e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -61,7 +62,7 @@ public class LocalBufferPoolTest {
 
 	@Before
 	public void setupLocalBufferPool() {
-		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
 		localBufferPool = new LocalBufferPool(networkBufferPool, 1);
 
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 6b22cd9..fd5c7a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.core.memory.MemoryType;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,7 +36,7 @@ public class NetworkBufferPoolTest {
 			final int bufferSize = 128;
 			final int numBuffers = 10;
 
-			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
+			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, MemoryType.HEAP);
 			assertEquals(bufferSize, globalPool.getMemorySegmentSize());
 			assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
 			assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
@@ -70,7 +71,7 @@ public class NetworkBufferPoolTest {
 	@Test
 	public void testDestroyAll() {
 		try {
-			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
 
 			BufferPool fixedPool = globalPool.createBufferPool(2, true);
 			BufferPool nonFixedPool = globalPool.createBufferPool(5, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 60241e3..f514cbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.task.IntegerTaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -53,7 +53,7 @@ public class NettyMessageSerializationTest {
 	@Test
 	public void testEncodeDecode() {
 		{
-			Buffer buffer = spy(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
+			Buffer buffer = spy(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
 			ByteBuffer nioBuffer = buffer.getNioBuffer();
 
 			for (int i = 0; i < 1024; i += 4) {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 65780b4..cfbe99e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -68,7 +68,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 			@Override
 			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
 				if (inputIterator.next(reuse) != null) {
-					final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+					final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
 					serializer.setNextBuffer(buffer);
 					serializer.addRecord(reuse);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index cd56318..ea40a55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -94,7 +95,7 @@ public class LocalInputChannelTest {
 
 		final NetworkBufferPool networkBuffers = new NetworkBufferPool(
 				(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
-				TestBufferFactory.BUFFER_SIZE);
+				TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
 
 		final ResultPartitionConsumableNotifier partitionConsumableNotifier =
 				mock(ResultPartitionConsumableNotifier.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 82cc730..f4c37f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
@@ -100,7 +100,8 @@ public class SingleInputGateTest {
 		when(taskEventDispatcher.publish(any(ResultPartitionID.class), any(TaskEvent.class))).thenReturn(true);
 
 		final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
-		when(iterator.getNextBuffer()).thenReturn(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
+		when(iterator.getNextBuffer()).thenReturn(
+				new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))).thenReturn(iterator);

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d9e3562..d628596 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.serialization;
 
-import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -52,7 +52,7 @@ public class LargeRecordsTest {
 			final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
 			final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
 
-			final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 
 			List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
 			List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
@@ -149,7 +149,7 @@ public class LargeRecordsTest {
 			final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
 			final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
 
-			final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
+			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 
 			List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
 			List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index cdba545..4b3b465 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.io.network.util;
 
+import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
@@ -59,7 +61,7 @@ public class TestBufferFactory {
 	public Buffer create() {
 		numberOfCreatedBuffers.incrementAndGet();
 
-		return new Buffer(new MemorySegment(new byte[bufferSize]), bufferRecycler);
+		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
 	}
 
 	public Buffer createFrom(MemorySegment segment) {
@@ -85,7 +87,7 @@ public class TestBufferFactory {
 	public static Buffer createBuffer(int bufferSize) {
 		checkArgument(bufferSize > 0);
 
-		return new Buffer(new MemorySegment(new byte[bufferSize]), RECYCLER);
+		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER);
 	}
 
 	public static Buffer getMockBuffer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
index fc88207..15251e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 
 import org.junit.After;
@@ -48,14 +47,14 @@ public class MemoryManagerLazyAllocationTest {
 	
 	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
 
-	private DefaultMemoryManager memoryManager;
+	private MemoryManager memoryManager;
 
 	private Random random;
 
 	
 	@Before
 	public void setUp() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, false);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, false);
 		this.random = new Random(RANDOM_SEED);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index c0f32ca..a20a180 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -23,9 +23,8 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.junit.Assert;
@@ -48,14 +47,14 @@ public class MemoryManagerTest {
 	
 	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
 
-	private DefaultMemoryManager memoryManager;
+	private MemoryManager memoryManager;
 
 	private Random random;
 
 	
 	@Before
 	public void setUp() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, true);
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.random = new Random(RANDOM_SEED);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/655a891d/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
new file mode 100644
index 0000000..fad1b0e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemorySegmentSimpleTest {
+	
+	public static final long RANDOM_SEED = 643196033469871L;
+
+	public static final int MANAGED_MEMORY_SIZE = 1024 * 1024 * 16;
+
+	public static final int PAGE_SIZE = 1024 * 512;
+
+	private MemoryManager manager;
+
+	private MemorySegment segment;
+
+	private Random random;
+
+	@Before
+	public void setUp() throws Exception{
+		try {
+			this.manager = new MemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
+			this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
+			this.random = new Random(RANDOM_SEED);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Test setup failed.");
+		}
+	}
+
+	@After
+	public void tearDown() {
+		this.manager.release(this.segment);
+		this.random = null;
+		this.segment = null;
+		
+		if (!this.manager.verifyEmpty()) {
+			Assert.fail("Not all memory has been properly released.");
+		}
+		this.manager = null;
+	}
+
+	@Test
+	public void bulkByteAccess() {
+
+		// test exceptions
+		{
+			byte[] bytes = new byte[PAGE_SIZE / 4];
+
+			try {
+				segment.put(3 * (PAGE_SIZE / 4) + 1, bytes);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.put(7 * (PAGE_SIZE / 8) + 1, bytes, 0, bytes.length / 2);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior with default offset / length
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			byte[] src = new byte[PAGE_SIZE / 8];
+			for (int i = 0; i < 8; i++) {
+				random.nextBytes(src);
+				segment.put(i * (PAGE_SIZE / 8), src);
+			}
+
+			random.setSeed(seed);
+			byte[] expected = new byte[PAGE_SIZE / 8];
+			byte[] actual = new byte[PAGE_SIZE / 8];
+			for (int i = 0; i < 8; i++) {
+				random.nextBytes(expected);
+				segment.get(i * (PAGE_SIZE / 8), actual);
+
+				assertArrayEquals(expected, actual);
+			}
+		}
+
+		// test expected correct behavior with specific offset / length
+		{
+			byte[] expected = new byte[PAGE_SIZE];
+			random.nextBytes(expected);
+
+			for (int i = 0; i < 16; i++) {
+				segment.put(i * (PAGE_SIZE / 16), expected, i * (PAGE_SIZE / 16),
+					PAGE_SIZE / 16);
+			}
+
+			byte[] actual = new byte[PAGE_SIZE];
+			for (int i = 0; i < 16; i++) {
+				segment.get(i * (PAGE_SIZE / 16), actual, i * (PAGE_SIZE / 16),
+					PAGE_SIZE / 16);
+			}
+
+			assertArrayEquals(expected, actual);
+		}
+	}
+
+	@Test
+	public void byteAccess() {
+		// test exceptions
+		{
+			try {
+				segment.put(-1, (byte) 0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.put(PAGE_SIZE, (byte) 0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.get(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.get(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i < PAGE_SIZE; i++) {
+				segment.put(i, (byte) random.nextInt());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i < PAGE_SIZE; i++) {
+				assertEquals((byte) random.nextInt(), segment.get(i));
+			}
+		}
+	}
+
+	@Test
+	public void booleanAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putBoolean(-1, false);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putBoolean(PAGE_SIZE, false);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getBoolean(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getBoolean(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i < PAGE_SIZE; i++) {
+				segment.putBoolean(i, random.nextBoolean());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i < PAGE_SIZE; i++) {
+				assertEquals(random.nextBoolean(), segment.getBoolean(i));
+			}
+		}
+	}
+
+	@Test
+	public void charAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putChar(-1, 'a');
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putChar(PAGE_SIZE, 'a');
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getChar(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getChar(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+				segment.putChar(i, (char) ('a' + random.nextInt(26)));
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+				assertEquals((char) ('a' + random.nextInt(26)), segment.getChar(i));
+			}
+		}
+	}
+
+	@Test
+	public void doubleAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putDouble(-1, 0.0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putDouble(PAGE_SIZE, 0.0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getDouble(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getDouble(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+				segment.putDouble(i, random.nextDouble());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+				assertEquals(random.nextDouble(), segment.getDouble(i), 0.0);
+			}
+		}
+	}
+
+	// @Test
+	public void floatAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putFloat(-1, 0.0f);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putFloat(PAGE_SIZE, 0.0f);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getFloat(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getFloat(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+				segment.putFloat(i, random.nextFloat());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+				assertEquals(random.nextFloat(), segment.getFloat(i), 0.0);
+			}
+		}
+	}
+
+	@Test
+	public void longAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putLong(-1, 0L);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putLong(PAGE_SIZE, 0L);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getLong(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getLong(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+				segment.putLong(i, random.nextLong());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 8; i += 8) {
+				assertEquals(random.nextLong(), segment.getLong(i));
+			}
+		}
+		
+		// test unaligned offsets
+		{
+			final long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
+				long value = random.nextLong();
+				segment.putLong(offset, value);
+			}
+			
+			random.setSeed(seed);
+			for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
+				long shouldValue = random.nextLong();
+				long isValue = segment.getLong(offset);
+				assertEquals(shouldValue, isValue);
+			}
+		}
+	}
+
+	@Test
+	public void intAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putInt(-1, 0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putInt(PAGE_SIZE, 0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getInt(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getInt(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+				segment.putInt(i, random.nextInt());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 4; i += 4) {
+				assertEquals(random.nextInt(), segment.getInt(i));
+			}
+		}
+	}
+
+	@Test
+	public void shortAccess() {
+		// test exceptions
+		{
+			try {
+				segment.putShort(-1, (short) 0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.putShort(PAGE_SIZE, (short) 0);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getShort(-1);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+
+			try {
+				segment.getShort(PAGE_SIZE);
+				fail("IndexOutOfBoundsException expected");
+			} catch (Exception e) {
+				assertTrue(e instanceof IndexOutOfBoundsException);
+			}
+		}
+
+		// test expected correct behavior
+		{
+			long seed = random.nextLong();
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+				segment.putShort(i, (short) random.nextInt());
+			}
+
+			random.setSeed(seed);
+			for (int i = 0; i <= PAGE_SIZE - 2; i += 2) {
+				assertEquals((short) random.nextInt(), segment.getShort(i));
+			}
+		}
+	}
+	
+	@Test
+	public void testByteBufferWrapping() {
+		try {
+			MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
+			
+			ByteBuffer buf1 = seg.wrap(13, 47);
+			assertEquals(13, buf1.position());
+			assertEquals(60, buf1.limit());
+			assertEquals(47, buf1.remaining());
+			
+			ByteBuffer buf2 = seg.wrap(500, 267);
+			assertEquals(500, buf2.position());
+			assertEquals(767, buf2.limit());
+			assertEquals(267, buf2.remaining());
+			
+			ByteBuffer buf3 = seg.wrap(0, 1024);
+			assertEquals(0, buf3.position());
+			assertEquals(1024, buf3.limit());
+			assertEquals(1024, buf3.remaining());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+}


Mime
View raw message