flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/22] Rework the Taskmanager to a slot based model and remove legacy cloud code
Date Sun, 22 Jun 2014 21:47:22 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 7b6b5a2e0 -> b4b633eab


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingMixedOrderITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingMixedOrderITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingMixedOrderITCase.java
index 51d7a66..a9bda2b 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingMixedOrderITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingMixedOrderITCase.java
@@ -36,42 +36,44 @@ import eu.stratosphere.types.Key;
 public class GlobalSortingMixedOrderITCase extends RecordAPITestBase {
 	
 	private static final int NUM_RECORDS = 100000;
-	
+
 	private static final int RANGE_I1 = 100;
 	private static final int RANGE_I2 = 20;
 	private static final int RANGE_I3 = 20;
-	
+
 	private String recordsPath;
 	private String resultPath;
 
 	private String sortedRecords;
 
-
+	public GlobalSortingMixedOrderITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 
 	@Override
 	protected void preSubmit() throws Exception {
-		
+
 		ArrayList<TripleInt> records = new ArrayList<TripleInt>();
-		
+
 		//Generate records
 		final Random rnd = new Random(1988);
 		final StringBuilder sb = new StringBuilder(NUM_RECORDS * 7);
-		
-		
+
+
 		for (int j = 0; j < NUM_RECORDS; j++) {
 			TripleInt val = new TripleInt(rnd.nextInt(RANGE_I1), rnd.nextInt(RANGE_I2), rnd.nextInt(RANGE_I3));
 			records.add(val);
 			sb.append(val);
 			sb.append('\n');
 		}
-		
-		
+
+
 		this.recordsPath = createTempFile("records", sb.toString());
 		this.resultPath = getTempDirPath("result");
 
 		// create the sorted result;
 		Collections.sort(records);
-		
+
 		sb.setLength(0);
 		for (TripleInt val : records) {
 			sb.append(val);
@@ -83,7 +85,7 @@ public class GlobalSortingMixedOrderITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		GlobalSort globalSort = new GlobalSort();
-		return globalSort.getPlan("4", recordsPath, resultPath);
+		return globalSort.getPlan(new Integer(DOP).toString(), recordsPath, resultPath);
 	}
 
 	@Override
@@ -91,22 +93,22 @@ public class GlobalSortingMixedOrderITCase extends RecordAPITestBase {
 		// Test results
 		compareResultsByLinesInMemoryWithStrictOrder(this.sortedRecords, this.resultPath);
 	}
-	
-	
+
+
 	public static class TripleIntDistribution implements DataDistribution {
-		
+
 		private static final long serialVersionUID = 1L;
-		
+
 		private boolean ascendingI1, ascendingI2, ascendingI3;
-		
+
 		public TripleIntDistribution(Order orderI1, Order orderI2, Order orderI3) {
 			this.ascendingI1 = orderI1 != Order.DESCENDING;
 			this.ascendingI2 = orderI2 != Order.DESCENDING;
 			this.ascendingI3 = orderI3 != Order.DESCENDING;
 		}
-		
+
 		public TripleIntDistribution() {}
-		
+
 		@Override
 		public void write(DataOutput out) throws IOException {
 			out.writeBoolean(this.ascendingI1);
@@ -129,7 +131,7 @@ public class GlobalSortingMixedOrderITCase extends RecordAPITestBase {
 			if (!this.ascendingI1) {
 				boundVal = RANGE_I1 - boundVal;
 			}
-			
+
 			return new Key[] { new IntValue(boundVal), new IntValue(RANGE_I2), new IntValue(RANGE_I3)
};
 		}
 
@@ -137,11 +139,11 @@ public class GlobalSortingMixedOrderITCase extends RecordAPITestBase
{
 		public int getNumberOfFields() {
 			return 3;
 		}
-		
+
 	}
-	
+
 	private static class GlobalSort implements Program {
-		
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -150,10 +152,10 @@ public class GlobalSortingMixedOrderITCase extends RecordAPITestBase
{
 			final int numSubtasks     = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
 			final String recordsPath = (args.length > 1 ? args[1] : "");
 			final String output      = (args.length > 2 ? args[2] : "");
-			
+
 			@SuppressWarnings("unchecked")
 			FileDataSource source = new FileDataSource(new CsvInputFormat(',', IntValue.class, IntValue.class,
IntValue.class), recordsPath);
-			
+
 			FileDataSink sink = new FileDataSink(CsvOutputFormat.class, output);
 			CsvOutputFormat.configureRecordFormat(sink)
 				.recordDelimiter('\n')
@@ -162,34 +164,34 @@ public class GlobalSortingMixedOrderITCase extends RecordAPITestBase
{
 				.field(IntValue.class, 0)
 				.field(IntValue.class, 1)
 				.field(IntValue.class, 2);
-			
+
 			sink.setGlobalOrder(
 				new Ordering(0, IntValue.class, Order.DESCENDING)
 					.appendOrdering(1, IntValue.class, Order.ASCENDING)
 					.appendOrdering(2, IntValue.class, Order.DESCENDING),
 				new TripleIntDistribution(Order.DESCENDING, Order.ASCENDING, Order.DESCENDING));
 			sink.setInput(source);
-			
+
 			Plan p = new Plan(sink);
 			p.setDefaultParallelism(numSubtasks);
 			return p;
 		}
 	}
-	
+
 	/**
 	 * Three integers sorting descending, ascending, descending.
 	 */
 	private static final class TripleInt implements Comparable<TripleInt> {
-		
+
 		private final int i1, i2, i3;
 
-		
+
 		private TripleInt(int i1, int i2, int i3) {
 			this.i1 = i1;
 			this.i2 = i2;
 			this.i3 = i3;
 		}
-		
+
 		@Override
 		public String toString() {
 			StringBuilder bld = new StringBuilder(32);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GroupOrderReduceITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GroupOrderReduceITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GroupOrderReduceITCase.java
index d9b244c..48479ed 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GroupOrderReduceITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GroupOrderReduceITCase.java
@@ -60,6 +60,7 @@ public class GroupOrderReduceITCase extends RecordAPITestBase {
 	
 	public GroupOrderReduceITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	
@@ -104,7 +105,7 @@ public class GroupOrderReduceITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("GroupOrderTest#NumSubtasks", 4);
+		config.setInteger("GroupOrderTest#NumSubtasks", DOP);
 		return toParameterList(config);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/MergeOnlyJoinITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/MergeOnlyJoinITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/MergeOnlyJoinITCase.java
index 47a0f59..f19ceee 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/MergeOnlyJoinITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/MergeOnlyJoinITCase.java
@@ -62,6 +62,7 @@ public class MergeOnlyJoinITCase extends RecordAPITestBase {
 
 	public MergeOnlyJoinITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(4);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/PairwiseSPITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/PairwiseSPITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/PairwiseSPITCase.java
index 38168a2..52de6ab 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/PairwiseSPITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/PairwiseSPITCase.java
@@ -66,7 +66,7 @@ public class PairwiseSPITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		PairwiseSP a2aSP = new PairwiseSP();
-		return a2aSP.getPlan(config.getString("All2AllSPTest#NoSubtasks", "4"),
+		return a2aSP.getPlan(config.getString("All2AllSPTest#NoSubtasks", new Integer(DOP).toString()),
 				rdfDataPath,
 				resultPath,
 				"true");
@@ -80,7 +80,7 @@ public class PairwiseSPITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("All2AllSPTest#NoSubtasks", 4);
+		config.setInteger("All2AllSPTest#NoSubtasks", DOP);
 		return toParameterList(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery10ITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery10ITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery10ITCase.java
index a19cad1..5450498 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery10ITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery10ITCase.java
@@ -200,7 +200,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("TPCHQuery10Test#NoSubtasks", 4);
+		config.setInteger("TPCHQuery10Test#NoSubtasks", DOP);
 		return toParameterList(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3ITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3ITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3ITCase.java
index cc1d16a..54a5fe1 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3ITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3ITCase.java
@@ -122,6 +122,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase {
 
 	public TPCHQuery3ITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Override
@@ -150,7 +151,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("dop", 4);
+		config.setInteger("dop", DOP);
 		return toParameterList(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3WithUnionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3WithUnionITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3WithUnionITCase.java
index b0c5200..df95484 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3WithUnionITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery3WithUnionITCase.java
@@ -121,6 +121,10 @@ public class TPCHQuery3WithUnionITCase extends RecordAPITestBase {
 	
 	private static final String EXPECTED_RESULT = "5|0|147828.97\n" + "66|0|99188.09\n";
 
+	public TPCHQuery3WithUnionITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -136,7 +140,7 @@ public class TPCHQuery3WithUnionITCase extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		TPCHQuery3Unioned tpch3 = new TPCHQuery3Unioned();
 		return tpch3.getPlan(
-				"4",
+				new Integer(DOP).toString(),
 				orders1Path,
 				orders2Path,
 				partJoin1Path,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery4ITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery4ITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery4ITCase.java
index d8b9275..50097be 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery4ITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery4ITCase.java
@@ -112,6 +112,10 @@ public class TPCHQuery4ITCase extends RecordAPITestBase {
 
 	private static final String EXPECTED_RESULT = "1-URGENT|2|\n" + "3-MEDIUM|2|\n" + "4-NOT
SPECIFIED|4|";
 
+	public TPCHQuery4ITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		ordersPath = createTempFile("orders", ORDERS);
@@ -122,7 +126,7 @@ public class TPCHQuery4ITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		TPCHQuery4 tpch4 = new TPCHQuery4();
-		return tpch4.getPlan("4", ordersPath, lineitemsPath, resultPath);
+		return tpch4.getPlan(new Integer(DOP).toString(), ordersPath, lineitemsPath, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery9ITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery9ITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery9ITCase.java
index b1e9af6..a863236 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery9ITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQuery9ITCase.java
@@ -335,6 +335,10 @@ public class TPCHQuery9ITCase extends RecordAPITestBase {
 		+ "IRAN|1992|37970.953\n"
 		+ "IRAN|1993|83140.0\n"
 		+ "IRAN|1996|9672.556\n";
+
+	public TPCHQuery9ITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 		
 
 	@Override
@@ -352,7 +356,7 @@ public class TPCHQuery9ITCase extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		TPCHQuery9 tpch9 = new TPCHQuery9();
 		return tpch9.getPlan(
-				"4",
+				new Integer(DOP).toString(),
 				partInputPath,
 				partSuppInputPath,
 				ordersInputPath,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQueryAsterixITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQueryAsterixITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQueryAsterixITCase.java
index 9c2ba26..0b6220c 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQueryAsterixITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TPCHQueryAsterixITCase.java
@@ -62,6 +62,10 @@ public class TPCHQueryAsterixITCase extends RecordAPITestBase {
 		"2|MACHINERY\n" +
 		"2|FURNITURE\n";
 
+	public TPCHQueryAsterixITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -73,7 +77,7 @@ public class TPCHQueryAsterixITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		TPCHQueryAsterix tpchBench = new TPCHQueryAsterix();
-		return tpchBench.getPlan("4", ordersPath, custPath, resultPath);
+		return tpchBench.getPlan(new Integer(DOP).toString(), ordersPath, custPath, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TeraSortITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TeraSortITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TeraSortITCase.java
index 97db904..ab2ee7b 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TeraSortITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/TeraSortITCase.java
@@ -29,7 +29,10 @@ public class TeraSortITCase extends RecordAPITestBase {
 	private static final String INPUT_DATA_FILE = "/testdata/terainput.txt";
 	
 	private String resultPath;
-	
+
+	public TeraSortITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -41,7 +44,7 @@ public class TeraSortITCase extends RecordAPITestBase {
 		String testDataPath = getClass().getResource(INPUT_DATA_FILE).toString();
 		
 		TeraSort ts = new TeraSort();
-		return ts.getPlan("4", testDataPath, resultPath);
+		return ts.getPlan(new Integer(DOP).toString(), testDataPath, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WebLogAnalysisITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WebLogAnalysisITCase.java
index 8b60d52..a0458c7 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WebLogAnalysisITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WebLogAnalysisITCase.java
@@ -148,6 +148,10 @@ public class WebLogAnalysisITCase extends RecordAPITestBase {
 
 	private static final String expected = "87|url_24|39\n" + "59|url_28|41\n";
 
+	public WebLogAnalysisITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		docsPath   = createTempFile("docs", docs);
@@ -159,7 +163,7 @@ public class WebLogAnalysisITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		WebLogAnalysis relOLAP = new WebLogAnalysis();
-		return relOLAP.getPlan("4", docsPath, ranksPath, visitsPath, resultPath);
+		return relOLAP.getPlan(new Integer(DOP).toString(), docsPath, ranksPath, visitsPath, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountITCase.java
index 1a2d183..1adf4f8 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountITCase.java
@@ -23,6 +23,10 @@ public class WordCountITCase extends RecordAPITestBase {
 	protected String textPath;
 	protected String resultPath;
 
+	public WordCountITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	
 	@Override
 	protected void preSubmit() throws Exception {
@@ -33,7 +37,7 @@ public class WordCountITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		WordCount wc = new WordCount();
-		return wc.getPlan("4", textPath, resultPath);
+		return wc.getPlan(new Integer(DOP).toString(), textPath, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
index 30ce102..35a14c5 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/WordCountUnionReduceITCase.java
@@ -46,6 +46,10 @@ public class WordCountUnionReduceITCase extends RecordAPITestBase {
 
 	private String outputPath;
 
+	public WordCountUnionReduceITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -61,7 +65,7 @@ public class WordCountUnionReduceITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		WordCountUnionReduce wc = new WordCountUnionReduce();
-		return wc.getPlan(this.inputPath, this.outputPath, 4);
+		return wc.getPlan(this.inputPath, this.outputPath, DOP);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
index fae3f99..a8ab311 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -30,6 +30,7 @@ import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.test.util.RecordAPITestBase;
 import eu.stratosphere.util.LogUtils;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.After;
@@ -53,9 +54,9 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 
 	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
 
-	private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
+	private static final String PARALLELISM_CONFIG_KEY = "num.subtasks";
 
-	private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
+	private static final String NUM_SLOTS_PER_TM_CONFIG_KEY = "num.slots.per.tm";
 
 	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
 
@@ -64,13 +65,35 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 	private static final int IS_SLOW_SLEEP_MS = 10;
 
 	private static final int IS_SLOW_EVERY_NUM_RECORDS = (2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
+	
+	// ------------------------------------------------------------------------
+	
+	private int dataVolumeGb;
+	private boolean useForwarder;
+	private boolean isSlowSender;
+	private boolean isSlowReceiver;
+	private int parallelism;
 
 	// ------------------------------------------------------------------------
 
 	public NetworkStackThroughput(Configuration config) {
 		super(config);
-
-		setNumTaskManager(2);
+		
+		dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+		useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
+		isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
+		isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
+		parallelism = config.getInteger(PARALLELISM_CONFIG_KEY, 1);
+		
+		int numSlots = config.getInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, 1);
+		
+		if (parallelism % numSlots != 0) {
+			throw new RuntimeException("The test case defines a parallelism that is not a multiple
of the slots per task manager.");
+		}
+		
+		setNumTaskTracker(parallelism / numSlots);
+		setTaskManagerNumSlots(numSlots);
+		
 		LogUtils.initializeDefaultConsoleLogger();
 	}
 
@@ -94,8 +117,8 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
 			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
 			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
-			config.setInteger(NUM_SUBTASKS_CONFIG_KEY, (Integer) p[4]);
-			config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, (Integer) p[5]);
+			config.setInteger(PARALLELISM_CONFIG_KEY, (Integer) p[4]);
+			config.setInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, (Integer) p[5]);
 
 			configs.add(config);
 		}
@@ -107,14 +130,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
-		int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-		boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
-		boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-		boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-		int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
-		int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY,
1);
-
-		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks,
numSubtasksPerInstance);
+		return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
 	}
 
 	@After
@@ -133,14 +149,13 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 	}
 
 	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
boolean isSlowReceiver,
-									int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException
{
+									int numSubtasks) throws JobGraphDefinitionException {
 
 		JobGraph jobGraph = new JobGraph("Speed Test");
 
 		JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
 		producer.setInputClass(SpeedTestProducer.class);
 		producer.setNumberOfSubtasks(numSubtasks);
-		producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
 		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
 		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
 
@@ -149,13 +164,11 @@ public class NetworkStackThroughput extends RecordAPITestBase {
 			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
 			forwarder.setTaskClass(SpeedTestForwarder.class);
 			forwarder.setNumberOfSubtasks(numSubtasks);
-			forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
 		}
 
 		JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
 		consumer.setOutputClass(SpeedTestConsumer.class);
 		consumer.setNumberOfSubtasks(numSubtasks);
-		consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
 		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
 
 		if (useForwarder) {


Mime
View raw message