flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/22] Rework the Taskmanager to a slot based model and remove legacy cloud code
Date Sun, 22 Jun 2014 21:47:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTest.java
new file mode 100644
index 0000000..7460200
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTest.java
@@ -0,0 +1,232 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import eu.stratosphere.nephele.instance.*;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.util.LogUtils;
+
+/**
+ * Tests for {@link eu.stratosphere.nephele.instance.DefaultInstanceManager}.
+ */
+public class DefaultInstanceManagerTest {
+
+	@BeforeClass
+	public static void initLogging() {
+		LogUtils.initializeDefaultTestConsoleLogger();
+	}
+	
+	
+	@Test
+	public void testInstanceRegistering() {
+		try {
+			DefaultInstanceManager cm = new DefaultInstanceManager();
+			TestInstanceListener testInstanceListener = new TestInstanceListener();
+			cm.setInstanceListener(testInstanceListener);
+			
+			
+			int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
+			int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
+
+			HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
+																				2L * 1024L * 1024L * 1024L);
+
+			String hostname = "192.168.198.1";
+			InetAddress address = InetAddress.getByName("192.168.198.1");
+			
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
+			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
+			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 30, dataPort + 30);
+			
+			// register three instances
+			cm.registerTaskManager(ici1, hardwareDescription, 1);
+			cm.registerTaskManager(ici2, hardwareDescription, 1);
+			cm.registerTaskManager(ici3, hardwareDescription, 1);
+			
+
+			assertEquals(3, cm.getNumberOfSlots());
+
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testAllocationDeallocation() {
+		try {
+			DefaultInstanceManager cm = new DefaultInstanceManager();
+			TestInstanceListener testInstanceListener = new TestInstanceListener();
+			cm.setInstanceListener(testInstanceListener);
+			
+			
+			int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
+			int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
+
+			HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
+																				2L * 1024L * 1024L * 1024L);
+
+			String hostname = "192.168.198.1";
+			InetAddress address = InetAddress.getByName("192.168.198.1");
+			
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
+			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
+			
+			// register three instances
+			cm.registerTaskManager(ici1, hardwareDescription, 1);
+			cm.registerTaskManager(ici2, hardwareDescription, 1);
+
+			assertEquals(2, cm.getNumberOfSlots());
+			
+			
+			// allocate something
+			JobID jobID = new JobID();
+			Configuration conf = new Configuration();
+			cm.requestInstance(jobID, conf, 2);
+			
+			DefaultInstanceManagerTestUtils.waitForInstances(jobID, testInstanceListener, 3, 1000);
+			
+			List<AllocatedResource> allocatedResources = testInstanceListener.getAllocatedResourcesForJob(jobID);
+			assertEquals(2, allocatedResources.size());
+			
+			Iterator<AllocatedResource> it = allocatedResources.iterator();
+			Set<AllocationID> allocationIDs = new HashSet<AllocationID>();
+			while (it.hasNext()) {
+				AllocatedResource allocatedResource = it.next();
+
+				if (allocationIDs.contains(allocatedResource.getAllocationID())) {
+					fail("Discovered allocation ID " + allocatedResource.getAllocationID() + " at least twice");
+				} else {
+					allocationIDs.add(allocatedResource.getAllocationID());
+				}
+			}
+
+			// Try to allocate more resources which must result in an error
+			try {
+				cm.requestInstance(jobID, conf, 3);
+
+				fail("ClusterManager allowed to request more instances than actually available");
+
+			} catch (InstanceException ie) {
+				// Exception is expected and correct behavior here
+			}
+
+			// Release all allocated resources
+			it = allocatedResources.iterator();
+			while (it.hasNext()) {
+				final AllocatedResource allocatedResource = it.next();
+				cm.releaseAllocatedResource(allocatedResource);
+			}
+			
+			// Now further allocations should be possible
+			
+			cm.requestInstance(jobID, conf, 1);
+			
+			
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * This test checks the clean-up routines of the cluster manager.
+	 */
+	@Test
+	public void testCleanUp() {
+		try {
+			
+			final int CLEANUP_INTERVAL = 2;
+
+			// configure a short cleanup interval
+			Configuration config = new Configuration();
+			config.setInteger("instancemanager.cluster.cleanupinterval", CLEANUP_INTERVAL);
+			GlobalConfiguration.includeConfiguration(config);
+
+			DefaultInstanceManager cm = new DefaultInstanceManager();
+			TestInstanceListener testInstanceListener = new TestInstanceListener();
+			cm.setInstanceListener(testInstanceListener);
+
+
+			int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
+			int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
+
+			HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
+																				2L * 1024L * 1024L * 1024L);
+
+			String hostname = "192.168.198.1";
+			InetAddress address = InetAddress.getByName("192.168.198.1");
+
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
+			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
+			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 30, dataPort + 30);
+
+			// register three instances
+			cm.registerTaskManager(ici1, hardwareDescription, 1);
+			cm.registerTaskManager(ici2, hardwareDescription, 1);
+			cm.registerTaskManager(ici3, hardwareDescription, 1);
+
+			assertEquals(3, cm.getNumberOfSlots());
+
+			// request some instances
+			JobID jobID = new JobID();
+			Configuration conf = new Configuration();
+
+			cm.requestInstance(jobID, conf, 1);
+
+			DefaultInstanceManagerTestUtils.waitForInstances(jobID, testInstanceListener, 1, 1000);
+			assertEquals(1, testInstanceListener.getNumberOfAllocatedResourcesForJob(jobID));
+
+			// wait for the cleanup to kick in
+			Thread.sleep(2000 * CLEANUP_INTERVAL);
+
+			// check that the instances are gone
+			DefaultInstanceManagerTestUtils.waitForInstances(jobID, testInstanceListener, 0, 1000);
+			assertEquals(0, testInstanceListener.getNumberOfAllocatedResourcesForJob(jobID));
+
+
+			assertEquals(0, cm.getNumberOfSlots());
+
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTestUtils.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTestUtils.java
new file mode 100644
index 0000000..ca3d971
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/DefaultInstanceManagerTestUtils.java
@@ -0,0 +1,66 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance.cluster;
+
+import eu.stratosphere.nephele.instance.InstanceListener;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * This class contains utility methods used during the tests of the {@link eu.stratosphere.nephele.instance.DefaultInstanceManager} implementation.
+ * 
+ */
+public class DefaultInstanceManagerTestUtils {
+
+	/**
+	 * Granularity of the sleep time.
+	 */
+	private static final long SLEEP_TIME = 10; // 10 milliseconds
+
+	/**
+	 * Private constructor so the class cannot be instantiated.
+	 */
+	private DefaultInstanceManagerTestUtils() {
+	}
+
+	/**
+	 * Waits until a specific number of instances have registered or deregistrations with the given
+	 * {@link InstanceListener} object for a given job or the maximum wait time has elapsed.
+	 * 
+	 * @param jobID
+	 *        the ID of the job to check the instance registration for
+	 * @param instanceListener
+	 *        the listener which shall be notified when a requested instance is available for the job
+	 * @param numberOfInstances
+	 *        the number of registered instances to wait for
+	 * @param maxWaitTime
+	 *        the maximum wait time before this method returns
+	 */
+	public static void waitForInstances(JobID jobID, TestInstanceListener instanceListener,
+			int numberOfInstances, long maxWaitTime) {
+
+		final long startTime = System.currentTimeMillis();
+
+		while (instanceListener.getNumberOfAllocatedResourcesForJob(jobID) != numberOfInstances) {
+			try {
+				Thread.sleep(SLEEP_TIME);
+			} catch (InterruptedException e) {
+				break;
+			}
+
+			if ((System.currentTimeMillis() - startTime) >= maxWaitTime) {
+				break;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java
index 952e588..1bac907 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java
@@ -16,30 +16,22 @@ package eu.stratosphere.nephele.instance.cluster;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
+import eu.stratosphere.nephele.instance.*;
 import org.junit.Test;
 
 import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 
 /**
- * Tests for {@link ClusterInstance}.
+ * Tests for {@link eu.stratosphere.nephele.instance.Instance}.
  * 
  */
 public class HostInClusterTest {
@@ -49,7 +41,7 @@ public class HostInClusterTest {
 	 * 
 	 * @return a cluster instance of a special test type
 	 */
-	private ClusterInstance createTestClusterInstance() {
+	private Instance createTestClusterInstance() {
 
 		final int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
 		final int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
@@ -60,16 +52,8 @@ public class HostInClusterTest {
 			fail(e.getMessage());
 		}
 
-		final String identifier = "testtype";
-		final int numComputeUnits = 8;
 		final int numCores = 8;
 		final int memorySize = 32 * 1024;
-		final int diskCapacity = 200;
-		final int pricePerHour = 10;
-
-		final InstanceType capacity = InstanceTypeFactory.construct(identifier, numComputeUnits, numCores, memorySize,
-			diskCapacity,
-			pricePerHour);
 
 		final InstanceConnectionInfo instanceConnectionInfo = new InstanceConnectionInfo(inetAddress, ipcPort, dataPort);
 
@@ -77,8 +61,8 @@ public class HostInClusterTest {
 			memorySize * 1024L * 1024L, memorySize * 1024L * 1024L);
 
 		final NetworkTopology topology = NetworkTopology.createEmptyTopology();
-		ClusterInstance host = new ClusterInstance(instanceConnectionInfo, capacity, topology.getRootNode(), topology,
-			hardwareDescription);
+		Instance host = new Instance(instanceConnectionInfo, topology.getRootNode(), topology,
+			hardwareDescription, 8);
 
 		return host;
 	}
@@ -90,7 +74,7 @@ public class HostInClusterTest {
 	public void testHeartBeat() {
 		// check that heart beat is triggered correctly.
 
-		ClusterInstance host = createTestClusterInstance();
+		Instance host = createTestClusterInstance();
 
 		host.reportHeartBeat();
 
@@ -111,33 +95,33 @@ public class HostInClusterTest {
 	@Test
 	public void testAccounting() {
 		// check whether the accounting of capacity works correctly
-		final ClusterInstance host = createTestClusterInstance();
+		final Instance host = createTestClusterInstance();
 		final JobID jobID = new JobID();
-		final int numComputeUnits = 8 / 8;
-		final int numCores = 8 / 8;
-		final int memorySize = 32 * 1024 / 8;
-		final int diskCapacity = 200 / 8;
-		final InstanceType type = InstanceTypeFactory.construct("dummy", numComputeUnits, numCores, memorySize,
-			diskCapacity, -1);
 		for (int run = 0; run < 2; ++run) {
 			// do this twice to check that everything is correctly freed
-			AllocatedSlice[] slices = new AllocatedSlice[8];
+			AllocatedResource[] allocatedSlots = new AllocatedResource[8];
 			for (int i = 0; i < 8; ++i) {
-				slices[i] = host.createSlice(type, jobID);
-				assertNotNull(slices[i]);
-				assertEquals(numComputeUnits, slices[i].getType().getNumberOfComputeUnits());
-				assertEquals(numCores, slices[i].getType().getNumberOfCores());
-				assertEquals(memorySize, slices[i].getType().getMemorySize());
-				assertEquals(diskCapacity, slices[i].getType().getDiskCapacity());
+				try {
+					allocatedSlots[i] = host.allocateSlot(jobID);
+				}catch(InstanceException ex){
+					fail(ex.getMessage());
+				}
+
+				assertNotNull(allocatedSlots[i]);
 			}
 			// now no resources should be left
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 1, 0, 0, 0, 0), jobID));
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 1, 0, 0, 0), jobID));
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 1, 0, 0), jobID));
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 0, 1, 0), jobID));
+			boolean instanceException = false;
+
+			try{
+				host.allocateSlot(jobID);
+			}catch(InstanceException ex){
+				instanceException = true;
+			}
+
+			assertTrue(instanceException);
 
 			for (int i = 0; i < 8; ++i) {
-				host.removeAllocatedSlice(slices[i].getAllocationID());
+				host.releaseSlot(allocatedSlots[i].getAllocationID());
 			}
 		}
 	}
@@ -149,47 +133,51 @@ public class HostInClusterTest {
 	public void testTermination() {
 		
 		// check whether the accounting of capacity works correctly if terminateAllInstances is called
-		final ClusterInstance host = createTestClusterInstance();
+		final Instance host = createTestClusterInstance();
 		final JobID jobID = new JobID();
-		final int numComputeUnits = 8 / 8;
-		final int numCores = 8 / 8;
-		final int memorySize = 32 * 1024 / 8;
-		final int diskCapacity = 200 / 8;
-		final InstanceType type = InstanceTypeFactory.construct("dummy", numComputeUnits, numCores, memorySize,
-			diskCapacity, -1);
 		for (int run = 0; run < 2; ++run) {
 			// do this twice to check that everything is correctly freed
-			AllocatedSlice[] slices = new AllocatedSlice[8];
+			AllocatedResource[] allocatedResources = new AllocatedResource[8];
 			for (int i = 0; i < 8; ++i) {
-				slices[i] = host.createSlice(type, jobID);
-				assertNotNull(slices[i]);
-				assertEquals(numComputeUnits, slices[i].getType().getNumberOfComputeUnits());
-				assertEquals(numCores, slices[i].getType().getNumberOfCores());
-				assertEquals(memorySize, slices[i].getType().getMemorySize());
-				assertEquals(diskCapacity, slices[i].getType().getDiskCapacity());
+				try {
+					allocatedResources[i] = host.allocateSlot(jobID);
+				}catch (InstanceException ex){
+					fail(ex.getMessage());
+				}
+
+				assertNotNull(allocatedResources[i]);
 			}
+
+			boolean instanceException = false;
 			// now no resources should be left
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 1, 0, 0, 0, 0), jobID));
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 1, 0, 0, 0), jobID));
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 1, 0, 0), jobID));
-			assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 0, 1, 0), jobID));
-			List<AllocatedSlice> removedSlices = host.removeAllAllocatedSlices();
-
-			final Set<AllocatedSlice> slicesSet = new HashSet<AllocatedSlice>();
-			for(int i = 0; i < slices.length; ++i) {
-				slicesSet.add(slices[i]);
+			try {
+				host.allocateSlot(jobID);
+			} catch (InstanceException ex){
+				instanceException = true;
+			}
+
+			assertTrue(instanceException);
+			Collection<AllocatedSlot> allocatedSlots = host.removeAllocatedSlots();
+			Set<AllocationID> removedAllocationIDs = new HashSet<AllocationID>();
+
+			for(AllocatedSlot slot: allocatedSlots){
+				removedAllocationIDs.add(slot.getAllocationID());
+			}
+
+			final Set<AllocationID> allocationIDs = new HashSet<AllocationID>();
+			for(int i = 0; i < allocatedResources.length; ++i) {
+				allocationIDs.add(allocatedResources[i].getAllocationID());
 			}
 			
-			final Set<AllocatedSlice> removedSlicesSet = new HashSet<AllocatedSlice>(removedSlices);
-			
+
 			//Check if both sets are equal
-			assertEquals(slicesSet.size(), removedSlices.size());
-			final Iterator<AllocatedSlice> it = slicesSet.iterator();
+			assertEquals(allocationIDs.size(), removedAllocationIDs.size());
+			final Iterator<AllocationID> it = allocationIDs.iterator();
 			while(it.hasNext()) {
-				assertTrue(removedSlicesSet.remove(it.next()));
+				assertTrue(removedAllocationIDs.remove(it.next()));
 			}
 			
-			assertEquals(0, removedSlicesSet.size());
+			assertEquals(0, removedAllocationIDs.size());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMapTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMapTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMapTest.java
deleted file mode 100644
index 974283d..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMapTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import org.junit.Test;
-
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-
-/**
- * This class checks the {@link PendingRequestsMap} data structure.
- * 
- */
-public class PendingRequestsMapTest {
-
-	/**
-	 * The first instance type used in the tests.
-	 */
-	private static final InstanceType INSTANCE_TYPE1 = InstanceTypeFactory.construct("test1", 1, 1, 2, 2, 0);
-
-	/**
-	 * The second instance type used in the tests.
-	 */
-	private static final InstanceType INSTANCE_TYPE2 = InstanceTypeFactory.construct("test2", 2, 2, 4, 4, 0);
-
-	/**
-	 * Checks the correctness of the {@link PendingRequestsMap} data structure.
-	 */
-	@Test
-	public void testPendingRequestsMap() {
-
-		final PendingRequestsMap prm = new PendingRequestsMap();
-
-		assertFalse(prm.hasPendingRequests());
-
-		prm.addRequest(INSTANCE_TYPE1, 1);
-		prm.addRequest(INSTANCE_TYPE2, 2);
-		prm.addRequest(INSTANCE_TYPE2, 2);
-
-		assertTrue(prm.hasPendingRequests());
-
-		final Iterator<Map.Entry<InstanceType, Integer>> it = prm.iterator();
-		int iterationCounter = 0;
-		while (it.hasNext()) {
-
-			final Map.Entry<InstanceType, Integer> entry = it.next();
-			++iterationCounter;
-
-			if (entry.getKey().equals(INSTANCE_TYPE1)) {
-				assertEquals(1, entry.getValue().intValue());
-			}
-
-			if (entry.getKey().equals(INSTANCE_TYPE2)) {
-				assertEquals(4, entry.getValue().intValue());
-			}
-		}
-
-		assertEquals(2, iterationCounter);
-
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE1);
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE1);
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE1); // This call is actually superfluous
-
-		assertTrue(prm.hasPendingRequests());
-
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE2);
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE2);
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE2);
-		prm.decreaseNumberOfPendingInstances(INSTANCE_TYPE2);
-
-		assertFalse(prm.hasPendingRequests());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
index 92ea5ab..a8f1331 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
@@ -13,16 +13,15 @@
 
 package eu.stratosphere.nephele.instance.local;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.nephele.instance.InstanceManager;
 import junit.framework.Assert;
 
 import org.junit.Test;
 
+import eu.stratosphere.nephele.ExecutionMode;
 import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.jobmanager.JobManager;
 import eu.stratosphere.nephele.util.ServerTestUtils;
 
@@ -53,17 +52,9 @@ public class LocalInstanceManagerTest {
 
 			final TestInstanceListener testInstanceListener = new TestInstanceListener();
 	
-			LocalInstanceManager lm = (LocalInstanceManager) jm.getInstanceManager(); // this is for sure, because I chose the local strategy
+			InstanceManager im = jm.getInstanceManager();
 			try {
-				lm.setInstanceListener(testInstanceListener);
-	
-				final InstanceType defaultInstanceType = lm.getDefaultInstanceType();
-				assertEquals("test", defaultInstanceType.getIdentifier());
-				assertEquals(4, defaultInstanceType.getNumberOfComputeUnits());
-				assertEquals(4, defaultInstanceType.getNumberOfCores());
-				assertEquals(1024, defaultInstanceType.getMemorySize());
-				assertEquals(160, defaultInstanceType.getDiskCapacity());
-				assertEquals(0, defaultInstanceType.getPricePerHour());
+				im.setInstanceListener(testInstanceListener);
 	
 			} catch (Exception e) {
 				e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index 063b827..fa4fbfa 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -34,11 +34,8 @@ import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.util.JarFileCreator;
 import eu.stratosphere.nephele.util.ServerTestUtils;
 import eu.stratosphere.util.LogUtils;
-import eu.stratosphere.util.StringUtils;
-import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -166,19 +163,23 @@ public class JobManagerITCase {
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
 			i1.setFileInputClass(FileLineReader.class);
 			i1.setFilePath(new Path(new File(testDirectory).toURI()));
+			i1.setNumberOfSubtasks(1);
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
 			t1.setTaskClass(ForwardTask.class);
+			t1.setNumberOfSubtasks(1);
 
 			// task vertex 2
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
 			t2.setTaskClass(ForwardTask.class);
+			t2.setNumberOfSubtasks(1);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
 			o1.setFileOutputClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
+			o1.setNumberOfSubtasks(1);
 
 			t1.setVertexToShareInstancesWith(i1);
 			t2.setVertexToShareInstancesWith(i1);
@@ -473,19 +474,23 @@ public class JobManagerITCase {
 			final JobFileInputVertex i1 = new JobFileInputVertex("Input 1", jg);
 			i1.setFileInputClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile.toURI()));
+			i1.setNumberOfSubtasks(1);
 
 			// task vertex 1
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
 			t1.setTaskClass(ForwardTask.class);
+			t1.setNumberOfSubtasks(1);
 
 			// task vertex 2
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
 			t2.setTaskClass(ForwardTask.class);
+			t2.setNumberOfSubtasks(1);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
 			o1.setFileOutputClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
+			o1.setNumberOfSubtasks(1);
 
 			t1.setVertexToShareInstancesWith(i1);
 			t2.setVertexToShareInstancesWith(i1);
@@ -747,6 +752,7 @@ public class JobManagerITCase {
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
 			o1.setFileOutputClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
+			o1.setNumberOfSubtasks(1);
 
 			i1.setVertexToShareInstancesWith(o1);
 			i2.setVertexToShareInstancesWith(o1);
@@ -877,27 +883,23 @@ public class JobManagerITCase {
 			i1.setFileInputClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile1.toURI()));
 			i1.setNumberOfSubtasks(numberOfSubtasks);
-			i1.setNumberOfSubtasksPerInstance(numberOfSubtasks);
 
 			// input vertex 2
 			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
 			i2.setFileInputClass(FileLineReader.class);
 			i2.setFilePath(new Path(inputFile2.toURI()));
 			i2.setNumberOfSubtasks(numberOfSubtasks);
-			i2.setNumberOfSubtasksPerInstance(numberOfSubtasks);
 
 			// union task
 			final JobTaskVertex f1 = new JobTaskVertex("Forward 1", jg);
 			f1.setTaskClass(DoubleTargetTask.class);
 			f1.setNumberOfSubtasks(numberOfSubtasks);
-			f1.setNumberOfSubtasksPerInstance(numberOfSubtasks);
 
 			// output vertex
 			JobFileOutputVertex o1 = new JobFileOutputVertex("Output", jg);
 			o1.setFileOutputClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile.toURI()));
 			o1.setNumberOfSubtasks(numberOfSubtasks);
-			o1.setNumberOfSubtasksPerInstance(numberOfSubtasks);
 
 			i1.setVertexToShareInstancesWith(o1);
 			i2.setVertexToShareInstancesWith(o1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
new file mode 100644
index 0000000..c8bcddc
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
@@ -0,0 +1,185 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobmanager.scheduler.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
+
+import org.junit.Test;
+
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
+import eu.stratosphere.nephele.executiongraph.GraphConversionException;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
+import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
+import eu.stratosphere.nephele.template.AbstractGenericInputTask;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ *         This class checks the functionality of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} class
+ */
+public class DefaultSchedulerTest {
+
+	/**
+	 * Test input task.
+	 * 
+	 */
+	public static final class InputTask extends AbstractGenericInputTask {
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public void registerInputOutput() {
+			new RecordWriter<StringRecord>(this);
+		}
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public void invoke() throws Exception {
+			// Nothing to do here
+		}
+
+	}
+
+	/**
+	 * Test output task.
+	 * 
+	 */
+	public static final class OutputTask extends AbstractOutputTask {
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public void registerInputOutput() {
+			new RecordReader<StringRecord>(this, StringRecord.class);
+		}
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public void invoke() throws Exception {
+			// Nothing to do here
+		}
+
+	}
+
+	/**
+	 * Constructs a sample execution graph consisting of two vertices connected by a channel of the given type.
+	 * 
+	 * @param channelType
+	 *        the channel type to connect the vertices with
+	 * @return a sample execution graph
+	 */
+	private ExecutionGraph createExecutionGraph(ChannelType channelType) {
+
+		final JobGraph jobGraph = new JobGraph("Job Graph");
+
+		final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
+		inputVertex.setInputClass(InputTask.class);
+		inputVertex.setNumberOfSubtasks(1);
+
+		final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
+		outputVertex.setOutputClass(OutputTask.class);
+		outputVertex.setNumberOfSubtasks(1);
+
+		try {
+			inputVertex.connectTo(outputVertex, channelType);
+		} catch (JobGraphDefinitionException e) {
+			fail(StringUtils.stringifyException(e));
+		}
+
+		try {
+			LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+			return new ExecutionGraph(jobGraph, 1);
+
+		} catch (GraphConversionException e) {
+			fail(StringUtils.stringifyException(e));
+		} catch (IOException e) {
+			fail(StringUtils.stringifyException(e));
+		}
+
+		return null;
+	}
+
+	/**
+	 * Checks the behavior of the scheduleJob() method with a job consisting of two tasks connected via an in-memory
+	 * channel.
+	 */
+	@Test
+	public void testScheduleJobWithInMemoryChannel() {
+
+		final TestInstanceManager tim = new TestInstanceManager();
+		final TestDeploymentManager tdm = new TestDeploymentManager();
+		final DefaultScheduler scheduler = new DefaultScheduler(tdm, tim);
+
+		final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY);
+
+		try {
+			try {
+				scheduler.scheduleJob(executionGraph);
+			} catch (SchedulingException e) {
+				fail(StringUtils.stringifyException(e));
+			}
+
+			// Wait for the deployment to complete
+			tdm.waitForDeployment();
+
+			assertEquals(executionGraph.getJobID(), tdm.getIDOfLastDeployedJob());
+			final List<ExecutionVertex> listOfDeployedVertices = tdm.getListOfLastDeployedVertices();
+			assertNotNull(listOfDeployedVertices);
+			// Vertices connected via in-memory channels must be deployed in a single cycle.
+			assertEquals(2, listOfDeployedVertices.size());
+
+			// Check if the release of the allocated resources works properly by simulating the vertices' life cycle
+			assertEquals(0, tim.getNumberOfReleaseMethodCalls());
+
+			// Simulate vertex life cycle
+			for (final ExecutionVertex vertex : listOfDeployedVertices) {
+				vertex.updateExecutionState(ExecutionState.STARTING);
+				vertex.updateExecutionState(ExecutionState.RUNNING);
+				vertex.updateExecutionState(ExecutionState.FINISHING);
+				vertex.updateExecutionState(ExecutionState.FINISHED);
+			}
+
+			assertEquals(1, tim.getNumberOfReleaseMethodCalls());
+		} finally {
+			try {
+				LibraryCacheManager.unregister(executionGraph.getJobID());
+			} catch (IOException ioe) {
+				// Ignore exception here
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
deleted file mode 100644
index f1e3191..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.queue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.List;
-
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import org.junit.Test;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.GraphConversionException;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.runtime.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
-import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.util.StringUtils;
-
-/**
- *         This class checks the functionality of the {@link QueueScheduler} class
- */
-public class QueueSchedulerTest {
-
-	/**
-	 * Test input task.
-	 * 
-	 */
-	public static final class InputTask extends AbstractGenericInputTask {
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void registerInputOutput() {
-			new RecordWriter<StringRecord>(this);
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void invoke() throws Exception {
-			// Nothing to do here
-		}
-
-	}
-
-	/**
-	 * Test output task.
-	 * 
-	 */
-	public static final class OutputTask extends AbstractOutputTask {
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void registerInputOutput() {
-			new RecordReader<StringRecord>(this, StringRecord.class);
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void invoke() throws Exception {
-			// Nothing to do here
-		}
-
-	}
-
-	/**
-	 * Constructs a sample execution graph consisting of two vertices connected by a channel of the given type.
-	 * 
-	 * @param channelType
-	 *        the channel type to connect the vertices with
-	 * @param instanceManager
-	 *        the instance manager that shall be used during the creation of the execution graph
-	 * @return a sample execution graph
-	 */
-	private ExecutionGraph createExecutionGraph(final ChannelType channelType, final InstanceManager instanceManager) {
-
-		final JobGraph jobGraph = new JobGraph("Job Graph");
-
-		final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
-		inputVertex.setInputClass(InputTask.class);
-		inputVertex.setNumberOfSubtasks(1);
-
-		final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
-		outputVertex.setOutputClass(OutputTask.class);
-		outputVertex.setNumberOfSubtasks(1);
-
-		try {
-			inputVertex.connectTo(outputVertex, channelType);
-		} catch (JobGraphDefinitionException e) {
-			fail(StringUtils.stringifyException(e));
-		}
-
-		try {
-			LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-			return new ExecutionGraph(jobGraph, instanceManager);
-
-		} catch (GraphConversionException e) {
-			fail(StringUtils.stringifyException(e));
-		} catch (IOException e) {
-			fail(StringUtils.stringifyException(e));
-		}
-
-		return null;
-	}
-
-	/**
-	 * Checks the behavior of the scheduleJob() method with a job consisting of two tasks connected via an in-memory
-	 * channel.
-	 */
-	@Test
-	public void testSchedulJobWithInMemoryChannel() {
-
-		final TestInstanceManager tim = new TestInstanceManager();
-		final TestDeploymentManager tdm = new TestDeploymentManager();
-		final QueueScheduler scheduler = new QueueScheduler(tdm, tim);
-
-		final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY, tim);
-
-		try {
-			try {
-				scheduler.schedulJob(executionGraph);
-			} catch (SchedulingException e) {
-				fail(StringUtils.stringifyException(e));
-			}
-
-			// Wait for the deployment to complete
-			tdm.waitForDeployment();
-
-			assertEquals(executionGraph.getJobID(), tdm.getIDOfLastDeployedJob());
-			final List<ExecutionVertex> listOfDeployedVertices = tdm.getListOfLastDeployedVertices();
-			assertNotNull(listOfDeployedVertices);
-			// Vertices connected via in-memory channels must be deployed in a single cycle.
-			assertEquals(2, listOfDeployedVertices.size());
-
-			// Check if the release of the allocated resources works properly by simulating the vertices' life cycle
-			assertEquals(0, tim.getNumberOfReleaseMethodCalls());
-
-			// Simulate vertex life cycle
-			for (final ExecutionVertex vertex : listOfDeployedVertices) {
-				vertex.updateExecutionState(ExecutionState.STARTING);
-				vertex.updateExecutionState(ExecutionState.RUNNING);
-				vertex.updateExecutionState(ExecutionState.FINISHING);
-				vertex.updateExecutionState(ExecutionState.FINISHED);
-			}
-
-			assertEquals(1, tim.getNumberOfReleaseMethodCalls());
-		} finally {
-			try {
-				LibraryCacheManager.unregister(executionGraph.getJobID());
-			} catch (IOException ioe) {
-				// Ignore exception here
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestDeploymentManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestDeploymentManager.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestDeploymentManager.java
index 9f3c190..a118455 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestDeploymentManager.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestDeploymentManager.java
@@ -16,7 +16,7 @@ package eu.stratosphere.nephele.jobmanager.scheduler.queue;
 import java.util.List;
 
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobmanager.DeploymentManager;
 
@@ -46,7 +46,7 @@ public class TestDeploymentManager implements DeploymentManager {
 
 
 	@Override
-	public void deploy(final JobID jobID, final AbstractInstance instance,
+	public void deploy(final JobID jobID, final Instance instance,
 			final List<ExecutionVertex> verticesToBeDeployed) {
 
 		this.jobID = jobID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestInstanceManager.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestInstanceManager.java
index 955d3a0..5a3977a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestInstanceManager.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/TestInstanceManager.java
@@ -16,32 +16,18 @@ package eu.stratosphere.nephele.jobmanager.scheduler.queue;
 import java.net.Inet4Address;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
+import eu.stratosphere.nephele.instance.*;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.topology.NetworkNode;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 import eu.stratosphere.util.StringUtils;
 
 /**
- * A dummy implementation of an {@link InstanceManager} used for the {@link QueueScheduler} unit tests.
+ * A dummy implementation of an {@link eu.stratosphere.nephele.instance.InstanceManager} used for the {@link QueueScheduler} unit tests.
  * <p>
  * This class is thread-safe.
  * 
@@ -49,16 +35,6 @@ import eu.stratosphere.util.StringUtils;
 public final class TestInstanceManager implements InstanceManager {
 
 	/**
-	 * The default instance type to be used during the tests.
-	 */
-	private static final InstanceType INSTANCE_TYPE = InstanceTypeFactory.construct("test", 1, 1, 1024, 1024, 10);
-
-	/**
-	 * The instances this instance manager is responsible of.
-	 */
-	private final Map<InstanceType, InstanceTypeDescription> instanceMap = new HashMap<InstanceType, InstanceTypeDescription>();
-
-	/**
 	 * Counts the number of times the method releaseAllocatedResource is called.
 	 */
 	private volatile int numberOfReleaseCalls = 0;
@@ -74,16 +50,19 @@ public final class TestInstanceManager implements InstanceManager {
 	private final List<AllocatedResource> allocatedResources;
 
 	/**
-	 * Test implementation of {@link AbstractInstance}.
+	 * The test instance
+	 */
+	private final TestInstance testInstance;
+
+	/**
+	 * Test implementation of {@link eu.stratosphere.nephele.instance.Instance}.
 	 * 
 	 */
-	private static final class TestInstance extends AbstractInstance {
+	private static final class TestInstance extends Instance {
 
 		/**
 		 * Constructs a new test instance.
 		 * 
-		 * @param instanceType
-		 *        the instance type
 		 * @param instanceConnectionInfo
 		 *        the instance connection information
 		 * @param parentNode
@@ -92,11 +71,13 @@ public final class TestInstanceManager implements InstanceManager {
 		 *        the network topology
 		 * @param hardwareDescription
 		 *        the hardware description
+		 * @param numberSlots
+		 * 		  the number of slots available on the instance
 		 */
-		public TestInstance(final InstanceType instanceType, final InstanceConnectionInfo instanceConnectionInfo,
+		public TestInstance(final InstanceConnectionInfo instanceConnectionInfo,
 				final NetworkNode parentNode, final NetworkTopology networkTopology,
-				final HardwareDescription hardwareDescription) {
-			super(instanceType, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);
+				final HardwareDescription hardwareDescription, int numberSlots) {
+			super(instanceConnectionInfo, parentNode, networkTopology, hardwareDescription, numberSlots);
 		}
 	}
 
@@ -106,15 +87,13 @@ public final class TestInstanceManager implements InstanceManager {
 	public TestInstanceManager() {
 
 		final HardwareDescription hd = HardwareDescriptionFactory.construct(1, 1L, 1L);
-		final InstanceTypeDescription itd = InstanceTypeDescriptionFactory.construct(INSTANCE_TYPE, hd, 1);
-		instanceMap.put(INSTANCE_TYPE, itd);
 
 		this.allocatedResources = new ArrayList<AllocatedResource>();
 		try {
 			final InstanceConnectionInfo ici = new InstanceConnectionInfo(Inet4Address.getLocalHost(), 1, 1);
 			final NetworkTopology nt = new NetworkTopology();
-			final TestInstance ti = new TestInstance(INSTANCE_TYPE, ici, nt.getRootNode(), nt, hd);
-			this.allocatedResources.add(new AllocatedResource(ti, INSTANCE_TYPE, new AllocationID()));
+			this.testInstance = new TestInstance(ici, nt.getRootNode(), nt, hd, 1);
+			this.allocatedResources.add(new AllocatedResource(testInstance, new AllocationID()));
 		} catch (UnknownHostException e) {
 			throw new RuntimeException(StringUtils.stringifyException(e));
 		}
@@ -123,18 +102,7 @@ public final class TestInstanceManager implements InstanceManager {
 
 	@Override
 	public void requestInstance(final JobID jobID, final Configuration conf,
-			final InstanceRequestMap instanceRequestMap, final List<String> splitAffinityList) throws InstanceException {
-
-		if (instanceRequestMap.size() != 1) {
-			throw new InstanceException(
-				"requestInstance of TestInstanceManager expected to receive request for a single instance type");
-		}
-
-		if (instanceRequestMap.getMinimumNumberOfInstances(INSTANCE_TYPE) != 1) {
-			throw new InstanceException(
-				"requestInstance of TestInstanceManager expected to receive request for one instance of type "
-					+ INSTANCE_TYPE.getIdentifier());
-		}
+								int requiredSlots) throws InstanceException {
 
 		if (this.instanceListener == null) {
 			throw new InstanceException("instanceListener not registered with TestInstanceManager");
@@ -158,8 +126,7 @@ public final class TestInstanceManager implements InstanceManager {
 
 
 	@Override
-	public void releaseAllocatedResource(final JobID jobID, final Configuration conf,
-			final AllocatedResource allocatedResource) throws InstanceException {
+	public void releaseAllocatedResource(final AllocatedResource allocatedResource) throws InstanceException {
 
 		++this.numberOfReleaseCalls;
 	}
@@ -176,32 +143,16 @@ public final class TestInstanceManager implements InstanceManager {
 
 
 	@Override
-	public InstanceType getSuitableInstanceType(final int minNumComputeUnits, final int minNumCPUCores,
-			final int minMemorySize, final int minDiskCapacity, final int maxPricePerHour) {
-		throw new IllegalStateException("getSuitableInstanceType called on TestInstanceManager");
-	}
-
-
-	@Override
-	public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo,
-			final HardwareDescription hardwareDescription) {
+	public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo) {
 		throw new IllegalStateException("reportHeartBeat called on TestInstanceManager");
 	}
 
-
 	@Override
-	public InstanceType getInstanceTypeByName(final String instanceTypeName) {
-		throw new IllegalStateException("getInstanceTypeByName called on TestInstanceManager");
+	public void registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
+									final HardwareDescription hardwareDescription, int numberSlots){
+		throw new IllegalStateException("registerTaskManager called on TestInstanceManager.");
 	}
 
-
-	@Override
-	public InstanceType getDefaultInstanceType() {
-
-		return INSTANCE_TYPE;
-	}
-
-
 	@Override
 	public NetworkTopology getNetworkTopology(final JobID jobID) {
 		throw new IllegalStateException("getNetworkTopology called on TestInstanceManager");
@@ -214,27 +165,11 @@ public final class TestInstanceManager implements InstanceManager {
 		this.instanceListener = instanceListener;
 	}
 
-
-	@Override
-	public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
-
-		return this.instanceMap;
-	}
-
-
 	@Override
-	public AbstractInstance getInstanceByName(final String name) {
+	public Instance getInstanceByName(final String name) {
 		throw new IllegalStateException("getInstanceByName called on TestInstanceManager");
 	}
 
-
-	@Override
-	public void cancelPendingRequests(final JobID jobID) {
-		throw new IllegalStateException("cancelPendingRequests called on TestInstanceManager");
-
-	}
-
-
 	@Override
 	public void shutdown() {
 		throw new IllegalStateException("shutdown called on TestInstanceManager");
@@ -244,4 +179,9 @@ public final class TestInstanceManager implements InstanceManager {
 	public int getNumberOfTaskTrackers() {
 		throw new IllegalStateException("getNumberOfTaskTrackers called on TestInstanceManager");
 	}
+
+	@Override
+	public int getNumberOfSlots() {
+		return this.testInstance.getNumberOfSlots();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
index 630f365..f1d164a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
@@ -131,7 +131,6 @@ public class ManagementGraphTest {
 		assertEquals(origVertex.getExecutionState(), copyVertex.getExecutionState());
 		assertEquals(origVertex.getIndexInGroup(), copyVertex.getIndexInGroup());
 		assertEquals(origVertex.getInstanceName(), copyVertex.getInstanceName());
-		assertEquals(origVertex.getInstanceType(), copyVertex.getInstanceType());
 		assertEquals(origVertex.getNumberOfInputGates(), copyVertex.getNumberOfInputGates());
 		assertEquals(origVertex.getNumberOfOutputGates(), copyVertex.getNumberOfOutputGates());
 
@@ -248,15 +247,15 @@ public class ManagementGraphTest {
 
 		// Vertices
 		final ManagementVertex vertex1_1 = new ManagementVertex(groupVertex1, new ManagementVertexID(), "Host 1",
-			"small", 0);
+			0);
 		final ManagementVertex vertex2_1 = new ManagementVertex(groupVertex2, new ManagementVertexID(), "Host 2",
-			"medium", 0);
+			0);
 		final ManagementVertex vertex2_2 = new ManagementVertex(groupVertex2, new ManagementVertexID(), "Host 2",
-			"medium", 1);
+			1);
 		final ManagementVertex vertex3_1 = new ManagementVertex(groupVertex3, new ManagementVertexID(), "Host 2",
-			"medium", 0);
+			0);
 		final ManagementVertex vertex4_1 = new ManagementVertex(groupVertex4, new ManagementVertexID(), "Host 2",
-			"medium", 0);
+			0);
 
 		// Input/output gates
 		final ManagementGate outputGate1_1 = new ManagementGate(vertex1_1, new ManagementGateID(), 0, false);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerITCase.java
index 47de74f..02190ca 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerITCase.java
@@ -62,7 +62,7 @@ public class IOManagerITCase {
 
 	@Before
 	public void beforeTest() {
-		memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE);
+		memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE, 1);
 		ioManager = new IOManager();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerPerformanceBenchmark.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerPerformanceBenchmark.java
index 1f0c509..7936a95 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerPerformanceBenchmark.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerPerformanceBenchmark.java
@@ -69,7 +69,7 @@ public class IOManagerPerformanceBenchmark
 	@Before
 	public void startup()
 	{
-		memManager = new DefaultMemoryManager(MEMORY_SIZE);
+		memManager = new DefaultMemoryManager(MEMORY_SIZE,1);
 		ioManager = new IOManager();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerTest.java
index a4d92f1..460d546 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/iomanager/IOManagerTest.java
@@ -44,7 +44,7 @@ public class IOManagerTest
 	@Before
 	public void beforeTest()
 	{
-		this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024);
+		this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 1);
 		this.ioManager = new IOManager();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/memorymanager/MemorySegmentTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/memorymanager/MemorySegmentTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/memorymanager/MemorySegmentTest.java
index 0d12582..fdd6448 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/memorymanager/MemorySegmentTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/services/memorymanager/MemorySegmentTest.java
@@ -46,7 +46,7 @@ public class MemorySegmentTest {
 	@Before
 	public void setUp() throws Exception{
 		try {
-			this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, PAGE_SIZE);
+			this.manager = new DefaultMemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE);
 			this.segment = manager.allocatePages(new DefaultMemoryManagerTest.DummyInvokable(), 1).get(0);
 			this.random = new Random(RANDOM_SEED);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
index d6cb9b0..4202880 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
@@ -24,14 +24,11 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.Map;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.jobmanager.JobManagerITCase;
 import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
 
@@ -55,6 +52,8 @@ public final class ServerTestUtils {
 	 */
 	private static final String ECLIPSE_PATH_EXTENSION = "/src/test/resources";
 
+	private static final String INTELLIJ_PATH_EXTENSION = "/stratosphere-runtime/src/test/resources";
+
 	/**
 	 * Private constructor.
 	 */
@@ -201,6 +200,12 @@ public final class ServerTestUtils {
 			return configDir;
 		}
 
+		configDir = System.getProperty(USER_DIR_KEY) + INTELLIJ_PATH_EXTENSION + CORRECT_CONF_DIR;
+
+		if(new File(configDir).exists()){
+			return configDir;
+		}
+
 		return null;
 	}
 
@@ -217,12 +222,8 @@ public final class ServerTestUtils {
 	public static void waitForJobManagerToBecomeReady(final ExtendedManagementProtocol jobManager) throws IOException,
 			InterruptedException {
 
-		Map<InstanceType, InstanceTypeDescription> instanceMap = jobManager.getMapOfAvailableInstanceTypes();
-
-		while (instanceMap.isEmpty()) {
-
+		while (jobManager.getAvailableSlots() == 0) {
 			Thread.sleep(100);
-			instanceMap = jobManager.getMapOfAvailableInstanceTypes();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
index ab57a18..a28ba38 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
@@ -98,7 +98,7 @@ public class HashMatchIteratorITCase {
 		this.pairRecordPairComparator = new IntPairRecordPairComparator();
 		this.recordPairPairComparator = new RecordIntPairPairComparator();
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManager();
 	}
 
@@ -150,7 +150,7 @@ public class HashMatchIteratorITCase {
 					new BuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, MEMORY_SIZE);
+						this.memoryManager, ioManager, this.parentTask, 1.0);
 			
 			iterator.open();
 			
@@ -237,7 +237,7 @@ public class HashMatchIteratorITCase {
 					new BuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, MEMORY_SIZE);
+						this.memoryManager, ioManager, this.parentTask, 1.0);
 
 			iterator.open();
 			
@@ -286,7 +286,7 @@ public class HashMatchIteratorITCase {
 				new BuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, MEMORY_SIZE);
+					this.memoryManager, ioManager, this.parentTask, 1.0);
 
 			iterator.open();
 			
@@ -373,7 +373,7 @@ public class HashMatchIteratorITCase {
 				new BuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, MEMORY_SIZE);
+					this.memoryManager, ioManager, this.parentTask, 1.0);
 			
 			iterator.open();
 			
@@ -420,7 +420,7 @@ public class HashMatchIteratorITCase {
 					new BuildSecondHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0);
 			
 			iterator.open();
 			
@@ -467,7 +467,7 @@ public class HashMatchIteratorITCase {
 					new BuildFirstHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0);
 			
 			iterator.open();
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashTableITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashTableITCase.java
index a845318..4a2fd7d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashTableITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashTableITCase.java
@@ -91,7 +91,7 @@ public class HashTableITCase {
 		this.pairProbeSideComparator = new IntPairComparator();
 		this.pairComparator = new IntPairPairComparator();
 		
-		this.memManager = new DefaultMemoryManager(32 * 1024 * 1024);
+		this.memManager = new DefaultMemoryManager(32 * 1024 * 1024,1);
 		this.ioManager = new IOManager();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
index 4a4e13a..d9c8b08 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
@@ -116,7 +116,7 @@ public class ReOpenableHashTableITCase {
 		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
 		this.ioManager = new IOManager();
 	}
 
@@ -238,7 +238,7 @@ public class ReOpenableHashTableITCase {
 				new BuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, MEMORY_SIZE);
+					this.memoryManager, ioManager, this.parentTask, 1.0);
 		
 		iterator.open();
 		// do first join with both inputs

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
index ffc4ae7..fbe4f5b 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
@@ -73,7 +73,7 @@ public class ChannelViewsTest
 
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, MEMORY_PAGE_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
 		this.ioManager = new IOManager();
 	}
 
@@ -189,7 +189,7 @@ public class ChannelViewsTest
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
 		final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
-		
+
 		// write a number of pairs
 		final Record rec = new Record();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
@@ -197,13 +197,13 @@ public class ChannelViewsTest
 			rec.write(outView);
 		}
 		this.memoryManager.release(outView.close());
-		
+
 		// create the reader input view
 		memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
 		final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
 		final ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
 		generator.reset();
-		
+
 		// read and re-generate all records and compare them
 		try {
 			final Record readRec = new Record();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
index fbf363d..1809540 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
@@ -64,7 +64,7 @@ public class SpillingBufferTest {
 
 	@Before
 	public void beforeTest() {
-		memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
+		memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
 		ioManager = new IOManager();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/event/EventWithAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/event/EventWithAggregatorsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/event/EventWithAggregatorsTest.java
index dcda405..8204eed 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/event/EventWithAggregatorsTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/event/EventWithAggregatorsTest.java
@@ -117,6 +117,8 @@ public class EventWithAggregatorsTest {
 	
 	private static class TestAggregator<T extends Value> implements Aggregator<T> {
 
+		private static final long serialVersionUID = 1L;
+		
 		private final T val;
 		
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableIteratorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableIteratorTest.java
index c7b4644..49afc3a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableIteratorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableIteratorTest.java
@@ -50,7 +50,7 @@ public class BlockResettableIteratorTest
 	@Before
 	public void startup() {
 		// set up IO and memory manager
-		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY);
+		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
 		
 		// create test objects
 		this.objects = new ArrayList<Record>(20000);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableMutableObjectIteratorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableMutableObjectIteratorTest.java
index 4b562bf..e3349fd 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableMutableObjectIteratorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/resettable/BlockResettableMutableObjectIteratorTest.java
@@ -52,7 +52,7 @@ public class BlockResettableMutableObjectIteratorTest
 	@Before
 	public void startup() {
 		// set up IO and memory manager
-		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY);
+		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
 		
 		// create test objects
 		this.objects = new ArrayList<Record>(20000);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
index b416b73..26ce081 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
@@ -72,7 +72,7 @@ public class AsynchonousPartialSorterITCase
 	@Before
 	public void beforeTest()
 	{
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1);
 		this.ioManager = new IOManager();
 		this.serializer = RecordSerializerFactory.get();
 		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
@@ -107,7 +107,7 @@ public class AsynchonousPartialSorterITCase
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 32 * 1024 * 1024);
+				this.parentTask, this.serializer, this.comparator, 1.0);
 	
 			runPartialSorter(sorter, NUM_RECORDS, 0);
 		}
@@ -130,7 +130,7 @@ public class AsynchonousPartialSorterITCase
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 32 * 1024 * 1024);
+				this.parentTask, this.serializer, this.comparator, 1.0);
 	
 			runPartialSorter(sorter, NUM_RECORDS, 2);
 		}
@@ -153,7 +153,7 @@ public class AsynchonousPartialSorterITCase
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 32 * 1024 * 1024);
+				this.parentTask, this.serializer, this.comparator, 1.0);
 	
 			runPartialSorter(sorter, NUM_RECORDS, 28);
 		}
@@ -178,7 +178,7 @@ public class AsynchonousPartialSorterITCase
 				// merge iterator
 				LOG.debug("Initializing sortmerger...");
 				sorter = new ExceptionThrowingAsynchronousPartialSorter<Record>(this.memoryManager, source,
-						this.parentTask, this.serializer, this.comparator, 32 * 1024 * 1024);
+						this.parentTask, this.serializer, this.comparator, 1.0);
 		
 				runPartialSorter(sorter, NUM_RECORDS, 0);
 				
@@ -283,10 +283,10 @@ public class AsynchonousPartialSorterITCase
 		public ExceptionThrowingAsynchronousPartialSorter(MemoryManager memoryManager,
 				MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 				TypeSerializerFactory<E> serializer, TypeComparator<E> comparator,
-				long totalMemory)
+				double memoryFraction)
 		throws IOException, MemoryAllocationException
 		{
-			super(memoryManager, input, parentTask, serializer, comparator, totalMemory);
+			super(memoryManager, input, parentTask, serializer, comparator, memoryFraction);
 		}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
index 09777a3..1851480 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
@@ -85,7 +85,7 @@ public class CombiningUnilateralSortMergerITCase {
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManager();
 		
 		this.serializerFactory = RecordSerializerFactory.get();
@@ -121,7 +121,7 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
 				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
-				64 * 1024 * 1024, 64, 0.7f);
+				0.25, 64, 0.7f);
 
 		final Record rec = new Record();
 		rec.setField(1, new IntValue(1));
@@ -162,7 +162,7 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
 				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
-				3 * 1024 * 1024, 64, 0.005f);
+				0.01, 64, 0.005f);
 
 		final Record rec = new Record();
 		rec.setField(1, new IntValue(1));
@@ -211,7 +211,7 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
 				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
-				64 * 1024 * 1024, 2, 0.7f);
+				0.25, 2, 0.7f);
 
 		// emit data
 		LOG.debug("emitting data");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
index bd68382..7ba42b9 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
@@ -76,7 +76,7 @@ public class ExternalSortITCase {
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManager();
 		
 		this.pactRecordSerializer = RecordSerializerFactory.get();
@@ -113,7 +113,7 @@ public class ExternalSortITCase {
 		
 		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 			source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-			64 * 1024 * 1024, 2, 0.9f);
+				(double)64/78, 2, 0.9f);
 
 		// emit data
 		LOG.debug("Reading and sorting data...");
@@ -159,7 +159,7 @@ public class ExternalSortITCase {
 		
 		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				64 * 1024 * 1024, 10, 2, 0.9f);
+				(double)64/78, 10, 2, 0.9f);
 
 		// emit data
 		LOG.debug("Reading and sorting data...");
@@ -205,7 +205,7 @@ public class ExternalSortITCase {
 		
 		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				16 * 1024 * 1024, 64, 0.7f);
+				(double)16/78, 64, 0.7f);
 
 		// emit data
 		LOG.debug("Reading and sorting data...");
@@ -254,7 +254,7 @@ public class ExternalSortITCase {
 		
 		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				64 * 1024 * 1024, 16, 0.7f);
+				(double)64/78, 16, 0.7f);
 		
 		// emit data
 		LOG.debug("Emitting data...");
@@ -307,7 +307,7 @@ public class ExternalSortITCase {
 		LOG.debug("Initializing sortmerger...");
 		
 		Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager, 
-				generator, this.parentTask, serializerFactory, comparator, 64 * 1024 * 1024, 4, 0.7f);
+				generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f);
 
 		// emit data
 		LOG.debug("Emitting data...");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
index cb570c4..f76b802 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
@@ -80,7 +80,7 @@ public class MassiveStringSortingITCase {
 			BufferedReader verifyReader = null;
 			
 			try {
-				MemoryManager mm = new DefaultMemoryManager(1024 * 1024);
+				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManager();
 					
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
@@ -170,7 +170,7 @@ public class MassiveStringSortingITCase {
 			BufferedReader verifyReader = null;
 			
 			try {
-				MemoryManager mm = new DefaultMemoryManager(1024 * 1024);
+				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManager();
 					
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) (TupleTypeInfo<?>) TypeInfoParser.parse("Tuple2<String, String[]>");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
index 8437c7e..0f3f558 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
@@ -88,7 +88,7 @@ public class SortMergeMatchIteratorITCase
 		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
 		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
 		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManager();
 	}
 


Mime
View raw message