flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/22] Rework the Taskmanager to a slot based model and remove legacy cloud code
Date Sun, 22 Jun 2014 21:47:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskExternalITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskExternalITCase.java
index 6fd366e..2d8113b 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskExternalITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskExternalITCase.java
@@ -33,6 +33,8 @@ import eu.stratosphere.types.Record;
 public class CombineTaskExternalITCase extends DriverTestBase<GenericGroupReduce<Record, ?>> {
 	
 	private static final long COMBINE_MEM = 3 * 1024 * 1024;
+
+	private final double combine_frac;
 	
 	private final ArrayList<Record> outList = new ArrayList<Record>();
 	
@@ -42,6 +44,8 @@ public class CombineTaskExternalITCase extends DriverTestBase<GenericGroupReduce
 
 	public CombineTaskExternalITCase() {
 		super(COMBINE_MEM, 0);
+
+		combine_frac = (double)COMBINE_MEM/this.getMemoryManager().getMemorySize();
 	}
 
 	
@@ -55,7 +59,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<GenericGroupReduce
 		setOutput(this.outList);
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setMemoryDriver(COMBINE_MEM);
+		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
 		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
@@ -108,7 +112,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<GenericGroupReduce
 		setOutput(this.outList);
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setMemoryDriver(COMBINE_MEM);
+		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
 		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskTest.java
index 0917051..98e7003 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CombineTaskTest.java
@@ -39,6 +39,8 @@ import eu.stratosphere.util.Collector;
 public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?>>
 {
 	private static final long COMBINE_MEM = 3 * 1024 * 1024;
+
+	private final double combine_frac;
 	
 	private final ArrayList<Record> outList = new ArrayList<Record>();
 	
@@ -48,6 +50,8 @@ public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?
 
 	public CombineTaskTest() {
 		super(COMBINE_MEM, 0);
+
+		combine_frac = (double)COMBINE_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Test
@@ -60,7 +64,7 @@ public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?
 		setOutput(this.outList);
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setMemoryDriver(COMBINE_MEM);
+		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
 		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
@@ -96,7 +100,7 @@ public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?
 		setOutput(new DiscardingOutputCollector<Record>());
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setMemoryDriver(COMBINE_MEM);
+		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
 		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();
@@ -120,7 +124,7 @@ public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?
 		setOutput(new DiscardingOutputCollector<Record>());
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setMemoryDriver(COMBINE_MEM);
+		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
 		final GroupReduceCombineDriver<Record> testTask = new GroupReduceCombineDriver<Record>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskExternalITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskExternalITCase.java
index 19e6209..fdf1941 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskExternalITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskExternalITCase.java
@@ -27,11 +27,14 @@ import eu.stratosphere.types.Record;
 public class CrossTaskExternalITCase extends DriverTestBase<GenericCrosser<Record, Record, Record>>
 {
 	private static final long CROSS_MEM = 1024 * 1024;
+
+	private final double cross_frac;
 	
 	private final CountingOutputCollector output = new CountingOutputCollector();
 
 	public CrossTaskExternalITCase() {
 		super(CROSS_MEM, 0);
+		cross_frac = (double)CROSS_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Test
@@ -52,7 +55,7 @@ public class CrossTaskExternalITCase extends DriverTestBase<GenericCrosser<Recor
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -84,7 +87,7 @@ public class CrossTaskExternalITCase extends DriverTestBase<GenericCrosser<Recor
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskTest.java
index 8667a6f..baa9589 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/CrossTaskTest.java
@@ -32,11 +32,15 @@ import eu.stratosphere.util.Collector;
 public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record, Record>>
 {
 	private static final long CROSS_MEM = 1024 * 1024;
+
+	private final double cross_frac;
 	
 	private final CountingOutputCollector output = new CountingOutputCollector();
 
 	public CrossTaskTest() {
 		super(CROSS_MEM, 0);
+
+		cross_frac = (double)CROSS_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Test
@@ -56,7 +60,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -86,7 +90,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -114,7 +118,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -144,7 +148,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -175,7 +179,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -206,7 +210,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -234,7 +238,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -263,7 +267,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -294,7 +298,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -324,7 +328,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -354,7 +358,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -384,7 +388,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
 				
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -411,7 +415,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new DelayingInfinitiveInputIterator(100));
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -454,7 +458,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new DelayingInfinitiveInputIterator(100));
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -497,7 +501,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new DelayingInfinitiveInputIterator(100));
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		
@@ -540,7 +544,7 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		addInput(new DelayingInfinitiveInputIterator(100));
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
-		getTaskConfig().setMemoryDriver(CROSS_MEM);
+		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
 		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
index bfd0d42..67f9fe8 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
@@ -42,13 +42,13 @@ import eu.stratosphere.types.IntValue;
 import eu.stratosphere.types.Key;
 import eu.stratosphere.types.Record;
 
-public class DataSinkTaskTest extends TaskTestBase {
-
-	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
+public class DataSinkTaskTest extends TaskTestBase
+{
+	private static final Log LOG = LogFactory.getLog(DataSinkTaskTest.class);
+	
+	private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
 
 	private static final int NETWORK_BUFFER_SIZE = 1024;
-
-	private static final Log LOG = LogFactory.getLog(DataSinkTaskTest.class);
 	
 	private final String tempTestPath = Path.constructTestPath("dst_test");
 	
@@ -65,7 +65,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-
+		
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 		
@@ -131,7 +131,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-
+		
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0);
@@ -201,8 +201,9 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-
-		super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
+		double memoryFraction = 1.0;
+		
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 		
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -210,8 +211,9 @@ public class DataSinkTaskTest extends TaskTestBase {
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0);
-		super.getTaskConfig().setMemoryInput(0, 4 * 1024 * 1024);
+				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
+				0);
+		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
@@ -279,7 +281,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 		int keyCnt = 100;
 		int valCnt = 20;
-
+		
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
@@ -310,9 +312,10 @@ public class DataSinkTaskTest extends TaskTestBase {
 	public void testFailingSortingDataSinkTask() {
 
 		int keyCnt = 100;
-		int valCnt = 20;
-
-		super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
+		int valCnt = 20;;
+		double memoryFraction = 1.0;
+		
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
 		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -322,8 +325,9 @@ public class DataSinkTaskTest extends TaskTestBase {
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0);
-		super.getTaskConfig().setMemoryInput(0, 4 * 1024 * 1024);
+				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
+				0);
+		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 		
@@ -347,7 +351,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 	
 	@Test
 	public void testCancelDataSinkTask() {
-
+		
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 		
@@ -389,8 +393,9 @@ public class DataSinkTaskTest extends TaskTestBase {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testCancelSortingDataSinkTask() {
-
-		super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
+		double memoryFraction = 1.0;
+		
+		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 		
 		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -402,7 +407,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 		super.getTaskConfig().setInputComparator(
 				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 
 				0);
-		super.getTaskConfig().setMemoryInput(0, 4 * 1024 * 1024);
+		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskExternalITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskExternalITCase.java
index 7ea8ea4..dd77059 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskExternalITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskExternalITCase.java
@@ -35,7 +35,11 @@ public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record
 	private static final long SORT_MEM = 3*1024*1024;
 	
 	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-	
+
+	private final double bnljn_frac;
+
+	private final double hash_frac;
+
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
 		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
@@ -48,6 +52,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record
 	
 	public MatchTaskExternalITCase() {
 		super(HASH_MEM, 2, SORT_MEM);
+		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Test
@@ -65,7 +71,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -99,7 +105,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.output);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -130,7 +136,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.output);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskTest.java
index ce5a8c5..de56c0b 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/MatchTaskTest.java
@@ -40,8 +40,14 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 	private static final long HASH_MEM = 6*1024*1024;
 	
 	private static final long SORT_MEM = 3*1024*1024;
+
+	private static final int NUM_SORTER = 2;
 	
 	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+	private final double bnljn_frac;
+
+	private final double hash_frac;
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
@@ -55,7 +61,9 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 	
 	
 	public MatchTaskTest() {
-		super(HASH_MEM, 2, SORT_MEM);
+		super(HASH_MEM, NUM_SORTER, SORT_MEM);
+		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	
@@ -72,7 +80,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -106,7 +114,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -142,7 +150,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -178,7 +186,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -214,7 +222,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -250,7 +258,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -286,7 +294,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -321,7 +329,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -357,7 +365,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -386,7 +394,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -437,7 +445,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -488,7 +496,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		addInputComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setMemoryDriver(BNLJN_MEM);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
@@ -539,7 +547,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -570,7 +578,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -601,7 +609,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -632,7 +640,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -663,7 +671,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -694,7 +702,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -724,7 +732,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -755,7 +763,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		setOutput(new NirvanaOutputList());
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -799,7 +807,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -843,7 +851,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		
@@ -887,7 +895,7 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setMemoryDriver(HASH_MEM);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
 		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskExternalITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskExternalITCase.java
index 7b14137..bd3524d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskExternalITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskExternalITCase.java
@@ -130,7 +130,9 @@ public class ReduceTaskExternalITCase extends DriverTestBase<GenericGroupReduce<
 		try {
 			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
-				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortMem, 2, 0.8f);
+				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
+					this.perSortFractionMem,
+					2, 0.8f);
 			addInput(sorter.getIterator());
 			
 			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
@@ -174,7 +176,9 @@ public class ReduceTaskExternalITCase extends DriverTestBase<GenericGroupReduce<
 		try {
 			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
-				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortMem, 2, 0.8f);
+				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
+					this.perSortFractionMem,
+					2, 0.8f);
 			addInput(sorter.getIterator());
 			
 			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskTest.java
index ad859f4..a968ce2 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/ReduceTaskTest.java
@@ -125,7 +125,8 @@ public class ReduceTaskTest extends DriverTestBase<GenericGroupReduce<Record, Re
 		try {
 			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
-				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortMem, 4, 0.8f);
+				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
+					4, 0.8f);
 			addInput(sorter.getIterator());
 			
 			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
index dda215e..3e9a04e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
@@ -43,11 +43,11 @@ import eu.stratosphere.util.LogUtils;
 
 
 public class ChainTaskTest extends TaskTestBase {
-
+	
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
 
 	private static final int NETWORK_BUFFER_SIZE = 1024;
-
+	
 	private final List<Record> outList = new ArrayList<Record>();
 	
 	@SuppressWarnings("unchecked")
@@ -67,10 +67,13 @@ public class ChainTaskTest extends TaskTestBase {
 	public void testMapTask() {
 		final int keyCnt = 100;
 		final int valCnt = 20;
+
+		final double memoryFraction = 1.0;
 		
 		try {
+		
 			// environment
-			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+			initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 			addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 			addOutput(this.outList);
 			
@@ -89,7 +92,7 @@ public class ChainTaskTest extends TaskTestBase {
 				// driver
 				combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 				combineConfig.setDriverComparator(compFact, 0);
-				combineConfig.setMemoryDriver(3 * 1024 * 1024);
+				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf
 				combineConfig.setStubWrapper(new UserCodeClassWrapper<MockReduceStub>(MockReduceStub.class));
@@ -123,10 +126,14 @@ public class ChainTaskTest extends TaskTestBase {
 	public void testFailingMapTask() {
 		int keyCnt = 100;
 		int valCnt = 20;
+
+		final long memorySize = 1024 * 1024 * 3;
+		final int bufferSize = 1014*1024;
+		final double memoryFraction = 1.0;
 		
 		try {
 			// environment
-			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+			initEnvironment(memorySize, bufferSize);
 			addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 			addOutput(this.outList);
 	
@@ -145,7 +152,7 @@ public class ChainTaskTest extends TaskTestBase {
 				// driver
 				combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 				combineConfig.setDriverComparator(compFact, 0);
-				combineConfig.setMemoryDriver(3 * 1024 * 1024);
+				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf
 				combineConfig.setStubWrapper(new UserCodeClassWrapper<MockFailingCombineStub>(MockFailingCombineStub.class));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/ReduceCombineDriverTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/ReduceCombineDriverTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/ReduceCombineDriverTest.java
index ef88cb9..4467e30 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/ReduceCombineDriverTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/ReduceCombineDriverTest.java
@@ -43,7 +43,7 @@ public class ReduceCombineDriverTest {
 		try {
 			TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
 					new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
-			context.getTaskConfig().setMemoryDriver(512 * 1024);
+			context.getTaskConfig().setRelativeMemoryDriver(0.5);
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			Collections.shuffle(data);
@@ -80,7 +80,7 @@ public class ReduceCombineDriverTest {
 			{
 				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
 						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
-				context.getTaskConfig().setMemoryDriver(512 * 1024);
+				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 				Collections.shuffle(data);
@@ -111,7 +111,7 @@ public class ReduceCombineDriverTest {
 			{
 				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
 						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
-				context.getTaskConfig().setMemoryDriver(512 * 1024);
+				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 				Collections.shuffle(data);
@@ -152,7 +152,7 @@ public class ReduceCombineDriverTest {
 			{
 				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
 						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
-				context.getTaskConfig().setMemoryDriver(512 * 1024);
+				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 				TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -180,7 +180,7 @@ public class ReduceCombineDriverTest {
 			{
 				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
 						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
-				context.getTaskConfig().setMemoryDriver(512 * 1024);
+				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 				TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/TestTaskContext.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/TestTaskContext.java
index 78b0709..f458ae9 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/TestTaskContext.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/drivers/TestTaskContext.java
@@ -62,7 +62,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	public TestTaskContext() {}
 	
 	public TestTaskContext(long memoryInBytes) {
-		this.memoryManager = new DefaultMemoryManager(memoryInBytes, 32 * 1024);
+		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DriverTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DriverTestBase.java
index c1e2ea8..531382e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DriverTestBase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DriverTestBase.java
@@ -64,6 +64,8 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	private final TaskConfig taskConfig;
 	
 	protected final long perSortMem;
+
+	protected final double perSortFractionMem;
 	
 	private Collector<Record> output;
 	
@@ -95,8 +97,9 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		final long totalMem = Math.max(memory, 0) + (Math.max(maxNumSorters, 0) * perSortMemory);
 		
 		this.perSortMem = perSortMemory;
+		this.perSortFractionMem = (double)perSortMemory/totalMem;
 		this.ioManager = new IOManager();
-		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem) : null;
+		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
 		
 		this.inputs = new ArrayList<MutableObjectIterator<Record>>();
 		this.comparators = new ArrayList<TypeComparator<Record>>();
@@ -115,7 +118,8 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	
 	public void addInputSorted(MutableObjectIterator<Record> input, RecordComparator comp) throws Exception {
 		UnilateralSortMerger<Record> sorter = new UnilateralSortMerger<Record>(
-				this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp, this.perSortMem, 32, 0.8f);
+				this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp,
+				this.perSortFractionMem, 32, 0.8f);
 		this.sorters.add(sorter);
 		this.inputs.add(null);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
index 2585a74..ab1d4e4 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
@@ -74,7 +74,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 		this.inputs = new LinkedList<InputGate<Record>>();
 		this.outputs = new LinkedList<OutputGate>();
 
-		this.memManager = new DefaultMemoryManager(memorySize);
+		this.memManager = new DefaultMemoryManager(memorySize, 1);
 		this.ioManager = new IOManager(System.getProperty("java.io.tmpdir"));
 		this.inputSplitProvider = inputSplitProvider;
 		this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
@@ -309,14 +309,13 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	}
 
 	@Override
-	public OutputGate createAndRegisterOutputGate()
-	{
+	public OutputGate createAndRegisterOutputGate() {
 		return this.outputs.remove(0);
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
-	public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate()
-	{
+	public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
 		return (InputGate<T>) this.inputs.remove(0);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
index f695979..1ee9293 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
@@ -354,7 +354,7 @@ public class InboundEnvelopeDecoderTest {
 		buf.readerIndex(0);
 		ByteBuf[] slices = randomSlices(buf);
 
-		ch.writeInbound(slices);
+		ch.writeInbound((Object) slices);
 
 		for (ByteBuf slice : slices) {
 			Assert.assertEquals(1, slice.refCnt());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-test-utils/src/main/java/eu/stratosphere/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/compiler/util/CompilerTestBase.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/compiler/util/CompilerTestBase.java
index 7c37bec..8d07163 100644
--- a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/compiler/util/CompilerTestBase.java
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/compiler/util/CompilerTestBase.java
@@ -12,7 +12,6 @@
  **********************************************************************************************************************/
 package eu.stratosphere.test.compiler.util;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,12 +31,6 @@ import eu.stratosphere.compiler.costs.DefaultCostEstimator;
 import eu.stratosphere.compiler.plan.OptimizedPlan;
 import eu.stratosphere.compiler.plan.PlanNode;
 import eu.stratosphere.compiler.plan.SingleInputPlanNode;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
 import eu.stratosphere.util.OperatingSystem;
 
 /**
@@ -63,37 +56,28 @@ public abstract class CompilerTestBase {
 	
 	protected PactCompiler noStatsCompiler;
 	
-	protected InstanceTypeDescription instanceType;
-	
 	private int statCounter;
 	
 	// ------------------------------------------------------------------------	
 	
 	@Before
-	public void setup() {
-		InetSocketAddress dummyAddr = new InetSocketAddress("localhost", 12345);
-		
+	public void setup() {		
 		this.dataStats = new DataStatistics();
-		this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator(), dummyAddr);
+		this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
 		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
 		
-		this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator(), dummyAddr);
+		this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
 		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		// create the instance type description
-		InstanceType iType = InstanceTypeFactory.construct("standard", 6, 2, 4096, 100, 0);
-		HardwareDescription hDesc = HardwareDescriptionFactory.construct(2, 4096 * 1024 * 1024, 2000 * 1024 * 1024);
-		this.instanceType = InstanceTypeDescriptionFactory.construct(iType, hDesc, DEFAULT_PARALLELISM * 2);
 	}
 	
 	// ------------------------------------------------------------------------
 	
 	public OptimizedPlan compileWithStats(Plan p) {
-		return this.withStatsCompiler.compile(p, this.instanceType);
+		return this.withStatsCompiler.compile(p);
 	}
 	
 	public OptimizedPlan compileNoStats(Plan p) {
-		return this.noStatsCompiler.compile(p, this.instanceType);
+		return this.noStatsCompiler.compile(p);
 	}
 	
 	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/AbstractTestBase.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/AbstractTestBase.java
index 28a2417..2873d86 100644
--- a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/AbstractTestBase.java
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/AbstractTestBase.java
@@ -45,26 +45,29 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.util.LogUtils;
 
 public abstract class AbstractTestBase {
-	private static final int DEFAULT_NUM_TASK_MANAGER = 1;
+	protected static final int MINIMUM_HEAP_SIZE_MB = 192;
 	
-	private static final int MINIMUM_HEAP_SIZE_MB = 192;
-	
-	private static final long MEMORY_SIZE = 80;
+	protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
+
+	protected static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+
+	protected static final int DEFAULT_NUM_TASK_TRACKER = 1;
 
-	private int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
-	
 	protected final Configuration config;
 	
 	protected NepheleMiniCluster executor;
 	
 	private final List<File> tempFiles;
-	
-		
+
+	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+
+	protected int numTaskTracker = DEFAULT_NUM_TASK_TRACKER;
+
 	public AbstractTestBase(Configuration config) {
 		verifyJvmOptions();
 		this.config = config;
 		this.tempFiles = new ArrayList<File>();
-		
+
 		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
 	}
 
@@ -73,15 +76,6 @@ public abstract class AbstractTestBase {
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Getter/Setter
-	// --------------------------------------------------------------------------------------------
-
-	public int getNumTaskManager() { return numTaskManager; }
-
-	public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
-
 	// --------------------------------------------------------------------------------------------
 	//  Local Test Cluster Life Cycle
 	// --------------------------------------------------------------------------------------------
@@ -91,8 +85,9 @@ public abstract class AbstractTestBase {
 		this.executor = new NepheleMiniCluster();
 		this.executor.setDefaultOverwriteFiles(true);
 		this.executor.setLazyMemoryAllocation(true);
-		this.executor.setMemorySize(MEMORY_SIZE);
-		this.executor.setNumTaskManager(this.numTaskManager);
+		this.executor.setMemorySize(TASK_MANAGER_MEMORY_SIZE);
+		this.executor.setTaskManagerNumSlots(taskManagerNumSlots);
+		this.executor.setNumTaskTracker(this.numTaskTracker);
 		this.executor.start();
 	}
 
@@ -109,6 +104,19 @@ public abstract class AbstractTestBase {
 			deleteAllTempFiles();
 		}
 	}
+
+	//------------------
+	// Accessors
+	//------------------
+
+	public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
+
+	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+
+	public int getNumTaskTracker() { return numTaskTracker; }
+
+	public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
+
 	
 	// --------------------------------------------------------------------------------------------
 	//  Temporary File Utilities

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/JavaProgramTestBase.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/JavaProgramTestBase.java
index dc83a56..2aa000a 100644
--- a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/JavaProgramTestBase.java
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/JavaProgramTestBase.java
@@ -45,11 +45,13 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	public JavaProgramTestBase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(degreeOfParallelism);
 	}
 	
 	
 	public void setDegreeOfParallelism(int degreeOfParallelism) {
 		this.degreeOfParallelism = degreeOfParallelism;
+		setTaskManagerNumSlots(degreeOfParallelism);
 	}
 	
 	public JobExecutionResult getLatestExecutionResult() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/RecordAPITestBase.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/RecordAPITestBase.java
index 45be660..23e7c2b 100644
--- a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/RecordAPITestBase.java
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/util/RecordAPITestBase.java
@@ -28,6 +28,8 @@ import eu.stratosphere.nephele.client.JobClient;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 
 public abstract class RecordAPITestBase extends AbstractTestBase {
+
+	protected static final int DOP = 4;
 	
 	protected JobExecutionResult jobExecutionResult;
 	
@@ -40,6 +42,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 	
 	public RecordAPITestBase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/accumulators/AccumulatorITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/accumulators/AccumulatorITCase.java
index a14ee03..18bc3e9 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/accumulators/AccumulatorITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/accumulators/AccumulatorITCase.java
@@ -71,13 +71,14 @@ public class AccumulatorITCase extends RecordAPITestBase {
 	private static final String INPUT = "one\n" + "two two\n" + "three three three\n";
 	private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
 	
-	private static final int NUM_SUBTASKS = 2;
+	private static final int DOP = 2;
 
 	protected String dataPath;
 	protected String resultPath;
 	
 	public AccumulatorITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Override
@@ -97,7 +98,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		
 		Assert.assertEquals(new Integer(3), (Integer) res.getAccumulatorResult("num-lines"));
 
-		Assert.assertEquals(new Double(NUM_SUBTASKS), (Double)res.getAccumulatorResult("open-close-counter"));
+		Assert.assertEquals(new Double(DOP), (Double)res.getAccumulatorResult("open-close-counter"));
 		
 		// Test histogram (words per line distribution)
 		Map<Integer, Integer> dist = Maps.newHashMap();
@@ -121,7 +122,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("IterationAllReducer#NoSubtasks", NUM_SUBTASKS);
+		config1.setInteger("IterationAllReducer#NoSubtasks", DOP);
 		return toParameterList(config1);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
index b80810b..75c50fc 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -62,13 +62,17 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 
 	private static final int NUM_FEATURES = 3;
 
+	private static final int DOP = 4;
+
 	protected String pointsPath;
 
 	protected String modelsPath;
 
 	protected String resultPath;
 
-
+	public BroadcastVarsNepheleITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 
 	public static final String getInputPoints(int numPoints, int numDimensions, long seed) {
@@ -122,7 +126,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, 4);
+		return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, DOP);
 	}
 
 	@Override
@@ -222,7 +226,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	@SuppressWarnings("unchecked")
 	private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		CsvInputFormat pointsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
-		JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks, numSubTasks);
+		JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -236,7 +240,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	@SuppressWarnings("unchecked")
 	private static JobInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		CsvInputFormat modelsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
-		JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks, numSubTasks);
+		JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -248,7 +252,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	}
 
 	private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		JobTaskVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]", jobGraph, numSubTasks, numSubTasks);
+		JobTaskVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -272,7 +276,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	}
 
 	private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 8cda32f..5e86af5 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -59,7 +59,11 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	private static final int ITERATION_ID = 42;
 	
 	private static final int MEMORY_PER_CONSUMER = 2;
-	
+
+	private static final int DOP = 4;
+
+	private static final double MEMORY_FRACTION_PER_CONSUMER = (double)MEMORY_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+
 	protected String dataPath;
 	protected String clusterPath;
 	protected String resultPath;
@@ -67,6 +71,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	
 	public KMeansIterativeNepheleITCase() {
 		LogUtils.initializeDefaultConsoleLogger(Level.ERROR);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	@Override
@@ -83,7 +88,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraph(dataPath, clusterPath, this.resultPath, 4, 20);
+		return createJobGraph(dataPath, clusterPath, this.resultPath, DOP, 20);
 	}
 
 	// -------------------------------------------------------------------------------------------------------------
@@ -93,7 +98,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
-		JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks, numSubTasks);
+		JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
 			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -114,7 +119,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	private static JobInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
-		JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks, numSubTasks);
+		JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -135,7 +140,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 
 	private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -152,7 +157,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	}
 	
 	private static JobTaskVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks, numSubTasks);
+		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
 
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
@@ -163,7 +168,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		headConfig.setInputSerializer(serializer, 0);
 		
 		// back channel / iterations
-		headConfig.setBackChannelMemory(MEMORY_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeBackChannelMemory(MEMORY_FRACTION_PER_CONSUMER);
 		
 		// output into iteration. broadcasting the centers
 		headConfig.setOutputSerializer(serializer);
@@ -190,7 +195,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 			TypeComparatorFactory<?> outputComparator)
 	{
 		JobTaskVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Map (Select nearest center)", jobGraph, numSubTasks, numSubTasks);
+			"Map (Select nearest center)", jobGraph, numSubTasks);
 		
 		TaskConfig intermediateConfig = new TaskConfig(mapper.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
@@ -220,7 +225,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		// ---------------- the tail (co group) --------------------
 		
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph,
-			numSubTasks, numSubTasks);
+			numSubTasks);
 		
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
@@ -235,7 +240,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 
 		tailConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
 		tailConfig.setInputComparator(inputComparator, 0);
-		tailConfig.setMemoryInput(0, MEMORY_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeMemoryInput(0, MEMORY_FRACTION_PER_CONSUMER);
 		tailConfig.setFilehandlesInput(0, 128);
 		tailConfig.setSpillingThresholdInput(0, 0.9f);
 		
@@ -279,7 +284,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		
 		JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
 		
-		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks, numSubTasks);
+		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		
 		JobOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks);
 		
@@ -293,7 +298,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(head, mapper, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 		new TaskConfig(mapper.getConfiguration()).setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
 		new TaskConfig(mapper.getConfiguration()).setInputCached(0, true);
-		new TaskConfig(mapper.getConfiguration()).setInputMaterializationMemory(0, MEMORY_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+		new TaskConfig(mapper.getConfiguration()).setRelativeInputMaterializationMemory(0,
+				MEMORY_FRACTION_PER_CONSUMER);
 
 		JobGraphUtils.connect(mapper, reducer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 		new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/CancellingTestBase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/CancellingTestBase.java
index 8ce656e..1fc289d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/CancellingTestBase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/CancellingTestBase.java
@@ -60,9 +60,13 @@ public abstract class CancellingTestBase {
 	 */
 	private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
 
+	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+
 	// --------------------------------------------------------------------------------------------
 	
 	protected NepheleMiniCluster executor;
+
+	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -83,7 +87,7 @@ public abstract class CancellingTestBase {
 		verifyJvmOptions();
 		this.executor = new NepheleMiniCluster();
 		this.executor.setDefaultOverwriteFiles(true);
-		
+		this.executor.setTaskManagerNumSlots(taskManagerNumSlots);
 		this.executor.start();
 	}
 
@@ -231,4 +235,8 @@ public abstract class CancellingTestBase {
 		final NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
+
+	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+
+	public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MapCancelingITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MapCancelingITCase.java
index 1aeb229..7d48ae8 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MapCancelingITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MapCancelingITCase.java
@@ -27,6 +27,11 @@ import eu.stratosphere.types.Record;
 import eu.stratosphere.util.Collector;
 
 public class MapCancelingITCase extends CancellingTestBase {
+	private static final int DOP = 4;
+
+	public MapCancelingITCase() {
+		setTaskManagerNumSlots(DOP);
+	}
 	
 //	@Test
 	public void testMapCancelling() throws Exception {
@@ -40,7 +45,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 5 * 1000, 10 * 1000);
 	}
@@ -57,7 +62,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 5 * 1000, 10 * 1000);
 	}
@@ -74,7 +79,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 10 * 1000, 10 * 1000);
 	}
@@ -91,7 +96,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 10 * 1000, 10 * 1000);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MatchJoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MatchJoinCancelingITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MatchJoinCancelingITCase.java
index 09413f0..82e2ace 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MatchJoinCancelingITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/cancelling/MatchJoinCancelingITCase.java
@@ -30,6 +30,11 @@ import eu.stratosphere.types.Record;
 import eu.stratosphere.util.Collector;
 
 public class MatchJoinCancelingITCase extends CancellingTestBase {
+	private static final int DOP = 4;
+
+	public MatchJoinCancelingITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
 //	@Test
@@ -48,7 +53,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 3000, 10*1000);
 	}
@@ -69,7 +74,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 5000, 10*1000);
 	}
@@ -90,7 +95,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 5000);
 		
@@ -117,7 +122,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 30 * 1000, 30 * 1000);
 	}
@@ -145,7 +150,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 10 * 1000, 20 * 1000);
 	}
@@ -171,7 +176,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
+		p.setDefaultParallelism(DOP);
 		
 		runAndCancelJob(p, 10 * 1000, 10 * 1000);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
index 984ecc2..b198d99 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/clients/examples/LocalExecutorITCase.java
@@ -23,8 +23,11 @@ import eu.stratosphere.client.LocalExecutor;
 import eu.stratosphere.test.recordJobs.wordcount.WordCount;
 import eu.stratosphere.test.testdata.WordCountData;
 
+
 public class LocalExecutorITCase {
 
+	private static final int DOP = 4;
+
 	@Test
 	public void testLocalExecutorWithWordCount() {
 		try {
@@ -40,14 +43,15 @@ public class LocalExecutorITCase {
 			
 			// run WordCount
 			WordCount wc = new WordCount();
-			wc.getPlan("4", inFile.toURI().toString(), outFile.toURI().toString());
-			
+
 			LocalExecutor executor = new LocalExecutor();
 			LocalExecutor.setLoggingLevel(Level.WARN);
 			executor.setDefaultOverwriteFiles(true);
+			executor.setTaskManagerNumSlots(DOP);
 			executor.start();
 			
-			executor.executePlan(wc.getPlan("4", inFile.toURI().toString(), outFile.toURI().toString()));
+			executor.executePlan(wc.getPlan(new Integer(DOP).toString(), inFile.toURI().toString(),
+					outFile.toURI().toString()));
 			executor.stop();
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
index 4a60836..272bce6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/WordCountITCase.java
@@ -25,7 +25,9 @@ public class WordCountITCase extends JavaProgramTestBase {
 	protected String resultPath;
 
 	public WordCountITCase(){
-		setNumTaskManager(2);
+		setDegreeOfParallelism(4);
+		setNumTaskTracker(2);
+		setTaskManagerNumSlots(2);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
index d2caeb7..831a9ae 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
@@ -27,7 +27,7 @@ public class ComputeEdgeDegreesITCase extends eu.stratosphere.test.recordJobTest
 	protected Plan getTestJob() {
 		ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees();
 		return computeDegrees.getScalaPlan(
-				config.getInteger("ComputeEdgeDegreesTest#NumSubtasks", 4),
+				config.getInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP),
 				edgesPath, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ConnectedComponentsITCase.java
index 6725bde..40f95af 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/ConnectedComponentsITCase.java
@@ -22,7 +22,7 @@ public class ConnectedComponentsITCase extends eu.stratosphere.test.iterative.Co
 	protected Plan getTestJob() {
 		ConnectedComponents cc = new ConnectedComponents();
 		Plan plan = cc.getScalaPlan(verticesPath, edgesPath, resultPath, 100);
-		plan.setDefaultParallelism(4);
+		plan.setDefaultParallelism(DOP);
 		return plan;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
index 5801d59..81b5c2a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
@@ -27,7 +27,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends eu.stratosphere.test.
 	protected Plan getTestJob() {
 		EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees();
 		return enumTriangles.getScalaPlan(
-				config.getInteger("EnumTrianglesTest#NumSubtasks", 4),
+				config.getInteger("EnumTrianglesTest#NumSubtasks", DOP),
 				edgesPath, resultPath);
 	}
 }


Mime
View raw message