flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [75/82] [abbrv] incubator-flink git commit: Change integration tests to reuse cluster in order to save startup and shutdown time.
Date Thu, 18 Dec 2014 18:46:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index bc026c9..6fe549f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -18,82 +18,68 @@
 
 package org.apache.flink.test.exampleScalaPrograms;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
+import java.io.File;
 
-import org.apache.flink.configuration.Configuration;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 import org.apache.flink.examples.scala.graph.PageRankBasic;
 import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class PageRankITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 2;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(ExecutionMode mode){
+		super(mode);
+	}
+
 	private String verticesPath;
 	private String edgesPath;
 	private String resultPath;
-	private String expectedResult;
-	
-	public PageRankITCase(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-		verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
-		edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-	}
+	private String expected;
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = runProgram(curProgId);
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		File resultFile = tempFolder.newFile();
+		//Delete file because the Scala API does not respect WriteMode set by the configuration
+		resultFile.delete();
+		resultPath = resultFile.toURI().toString();
+
+		File verticesFile = tempFolder.newFile();
+		Files.write(PageRankData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@After
+	public void after() throws Exception{
+		compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+	}
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
+	@Test
+	public void testPageRankWithSmallNumberOfIterations() throws Exception {
+		PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
+		expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
 	}
-	
 
-	public String runProgram(int progId) throws Exception {
-		
-		switch(progId) {
-		case 1: {
-			PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
-			return PageRankData.RANKS_AFTER_3_ITERATIONS;
-		}
-		case 2: {
-			// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
-			PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
-			return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
-		}
-		
-		default: 
-			throw new IllegalArgumentException("Invalid program id");
-		}
+	@Test
+	public void testPageRankWithConvergence() throws Exception {
+		// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
+		PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+		expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 674ca49..aae7168 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.test.iterative.aggregators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.Random;
 
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
 import org.junit.Assert;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -33,12 +31,14 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -49,213 +49,185 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
  *
  */
 @RunWith(Parameterized.class)
-public class AggregatorsITCase extends JavaProgramTestBase {
+public class AggregatorsITCase extends MultipleProgramsTestBase {
 
-	private static final int NUM_PROGRAMS = 5;
-	private static final int MAX_ITERATIONS = 20;	
+	private static final int MAX_ITERATIONS = 20;
 	private static final int DOP = 2;
+	private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
+
+	public AggregatorsITCase(ExecutionMode mode){
+		super(mode);
+	}
 
-	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
-	private String expectedResult;
+	private String expected;
 
-	public AggregatorsITCase(Configuration config) {
-		super(config);
-	}
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = AggregatorProgs.runProgram(curProgId, resultPath);
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void postSubmit() throws Exception {
+	@Test
+	public void testAggregatorWithoutParameterForIterate() throws Exception {
+		/*
+		 * Test aggregator without parameter for iterate
+		 */
 
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DOP);
 
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+		DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
+		IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+		// register aggregator
+		LongSumAggregator aggr = new LongSumAggregator();
+		iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
+		// register convergence criterion
+		iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+				new NegativeElementsConvergenceCriterion());
+
+		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
+		iteration.closeWith(updatedDs).writeAsText(resultPath);
+		env.execute();
 
-		return toParameterList(tConfigs);
+		expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
 	}
 
-	private static class AggregatorProgs {
+	@Test
+	public void testAggregatorWithParameterForIterate() throws Exception {
+		/*
+		 * Test aggregator with parameter for iterate
+		 */
 
-		private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DOP);
 
-		public static String runProgram(int progId, String resultPath) throws Exception {
+		DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
+		IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
 
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test aggregator without parameter for iterate
-				 */
+		// register aggregator
+		LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0);
+		iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
 
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(DOP);
-
-				DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
-				IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
-
-				// register aggregator
-				LongSumAggregator aggr = new LongSumAggregator();
-				iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-				
-				// register convergence criterion
-				iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, 
-						new NegativeElementsConvergenceCriterion());
-				
-				DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
-				iteration.closeWith(updatedDs).writeAsText(resultPath);
-				env.execute();
-
-				// return expected result
-				return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				 		+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				 		+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
-			}
-			case 2: {
-				/*
-				 * Test aggregator with parameter for iterate
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(DOP);
-
-				DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
-				IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
-
-				// register aggregator
-				LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0);
-				iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-				
-				// register convergence criterion
-				iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, 
-						new NegativeElementsConvergenceCriterion());
-				
-				DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
-				iteration.closeWith(updatedDs).writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				 		+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				 		+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
-			}
-			case 3: {
-				/*
+		// register convergence criterion
+		iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+				new NegativeElementsConvergenceCriterion());
+
+		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
+		iteration.closeWith(updatedDs).writeAsText(resultPath);
+		env.execute();
+
+		expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+	}
+
+	@Test
+	public void testConvergenceCriterionWithParameterForIterate() throws Exception {
+		/*
 				 * Test convergence criterion with parameter for iterate
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(DOP);
-
-				DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
-				IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
-
-				// register aggregator
-				LongSumAggregator aggr = new LongSumAggregator();
-				iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-				
-				// register convergence criterion
-				iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, 
-						new NegativeElementsConvergenceCriterionWithParam(3));
-				
-				DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
-				iteration.closeWith(updatedDs).writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				 		+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				 		+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
-			}
-			case 4: {
-				/*
-				 * Test aggregator without parameter for iterateDelta
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(DOP);
-				
-				DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
-						
-				DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
-						initialSolutionSet, MAX_ITERATIONS, 0);
-
-				// register aggregator
-				LongSumAggregator aggr = new LongSumAggregator();
-				iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-				
-				DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
-				
-				DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
-						.where(0).equalTo(0).flatMap(new UpdateFilter());
-				
-				DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-				DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-				result.writeAsText(resultPath);
-				
-				env.execute();
-				
-				// return expected result
-				return "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-				 		+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
-				 		+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
-				
-			}
-			case 5: {
-				/*
-				 * Test aggregator with parameter for iterateDelta
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(DOP);
-				
-				DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
-						
-				DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
-						initialSolutionSet, MAX_ITERATIONS, 0);
-
-				// register aggregator
-				LongSumAggregator aggr = new LongSumAggregatorWithParameter(4);
-				iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-				
-				DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
-				
-				DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
-						.where(0).equalTo(0).flatMap(new UpdateFilter());
-				
-				DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-				DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-				result.writeAsText(resultPath);
-				
-				env.execute();
-				
-				// return expected result
-				return "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-				 		+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
-				 		+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
-			}
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
 
-		}
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DOP);
+
+		DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
+		IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
+
+		// register aggregator
+		LongSumAggregator aggr = new LongSumAggregator();
+		iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+		// register convergence criterion
+		iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+				new NegativeElementsConvergenceCriterionWithParam(3));
+
+		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
+		iteration.closeWith(updatedDs).writeAsText(resultPath);
+		env.execute();
+
+		expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+	}
+
+	@Test
+	public void testAggregatorWithoutParameterForIterateDelta() throws Exception {
+		/*
+		 * Test aggregator without parameter for iterateDelta
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DOP);
+
+		DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
+
+		DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
+				initialSolutionSet, MAX_ITERATIONS, 0);
+
+		// register aggregator
+		LongSumAggregator aggr = new LongSumAggregator();
+		iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+		DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
+
+		DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
+				.where(0).equalTo(0).flatMap(new UpdateFilter());
+
+		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
+		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
+		result.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+				+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+				+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+	}
+
+	@Test
+	public void testAggregatorWithParameterForIterateDelta() throws Exception {
+		/*
+		 * Test aggregator with parameter for iterateDelta
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DOP);
+
+		DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
+
+		DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
+				initialSolutionSet, MAX_ITERATIONS, 0);
+
+		// register aggregator
+		LongSumAggregator aggr = new LongSumAggregatorWithParameter(4);
+		iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+		DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
+
+		DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
+				.where(0).equalTo(0).flatMap(new UpdateFilter());
+
+		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
+		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
+		result.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+				+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+				+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
 	}
 
 	@SuppressWarnings("serial")
@@ -294,7 +266,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 		@Override
 		public void open(Configuration conf) {
 
-			aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+			aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
 		}
 
 		@Override
@@ -316,7 +288,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 		@Override
 		public void open(Configuration conf) {
 
-			aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+			aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
 		}
 
 		@Override
@@ -366,11 +338,11 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 		@Override
 		public void open(Configuration conf) {
 
-			aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+			aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
 			superstep = getIterationRuntimeContext().getSuperstepNumber();
 
 			if (superstep > 1) {
-				previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+				previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR);
 				// check previous aggregator value
 				Assert.assertEquals(superstep - 1, previousAggr.getValue());
 			}
@@ -429,11 +401,11 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 		@Override
 		public void open(Configuration conf) {
 
-			aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+			aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
 			superstep = getIterationRuntimeContext().getSuperstepNumber();
 
 			if (superstep > 1) {
-				previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+				previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR);
 
 				// check previous aggregator value
 				switch(superstep) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index 61ad863..3fbcae6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -18,135 +18,107 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class AggregateITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 3;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public AggregateITCase(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+public class AggregateITCase extends MultipleProgramsTestBase {
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = AggregateProgs.runProgram(curProgId, resultPath);
+
+	public AggregateITCase(ExecutionMode mode){
+		super(mode);
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
-	
-	private static class AggregateProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
+
+	@Test
+	public void testFullAggregate() throws Exception {
+		/*
 				 * Full Aggregate
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Integer, Long>> aggregateDs = ds
-						.aggregate(Aggregations.SUM, 0)
-						.and(Aggregations.MAX, 1)
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Integer, Long>> aggregateDs = ds
+				.aggregate(Aggregations.SUM, 0)
+				.and(Aggregations.MAX, 1)
 						.project(0, 1);
-				
-				aggregateDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "231,6\n";
-			}
-			case 2: {
-				/*
+
+		aggregateDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "231,6\n";
+	}
+
+	@Test
+	public void testGroupedAggregate() throws Exception {
+		/*
 				 * Grouped Aggregate
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
-						.aggregate(Aggregations.SUM, 0)
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.SUM, 0)
 						.project(1, 0);
-				
-				aggregateDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1\n" +
+
+		aggregateDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1\n" +
 				"2,5\n" +
 				"3,15\n" +
 				"4,34\n" +
 				"5,65\n" +
 				"6,111\n";
-			} 
-			case 3: {
-				/*
-				 * Nested Aggregate
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
-						.aggregate(Aggregations.MIN, 0)
-						.aggregate(Aggregations.MIN, 0)
+	}
+
+	@Test
+	public void testNestedAggregate() throws Exception {
+		/*
+		 * Nested Aggregate
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.MIN, 0)
+				.aggregate(Aggregations.MIN, 0)
 						.project(0);
-				
-				aggregateDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1\n";
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-		}
+
+		aggregateDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1\n";
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index ffc208c..b249e22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -39,478 +36,486 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class CoGroupITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 13;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class CoGroupITCase extends MultipleProgramsTestBase {
+
+	public CoGroupITCase(ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public CoGroupITCase(Configuration config) {
-		super(config);
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = CoGroupProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
+		/*
+				 * CoGroup on tuples with key field selector
+				 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
+
+		coGroupDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,0\n" +
+				"2,6\n" +
+				"3,24\n" +
+				"4,60\n" +
+				"5,120\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
+		/*
+				 * CoGroup on two custom type inputs with key extractors
+				 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
+				KeySelector5()).with(new CustomTypeCoGroup());
+
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "1,0,test\n" +
+				"2,6,test\n" +
+				"3,24,test\n" +
+				"4,60,test\n" +
+				"5,120,test\n" +
+				"6,210,test\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	public static class KeySelector4 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+	public static class KeySelector5 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
 		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class CoGroupProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				
-				/*
-				 * CoGroup on tuples with key field selector
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
-				
-				coGroupDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0\n" +
-						"2,6\n" +
-						"3,24\n" +
-						"4,60\n" +
-						"5,120\n";
-			}
-			case 2: {
-				
-				/*
-				 * CoGroup on two custom type inputs with key extractors
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).equalTo(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).with(new CustomTypeCoGroup());
-				
-				coGroupDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,test\n" +
-						"2,6,test\n" +
-						"3,24,test\n" +
-						"4,60,test\n" +
-						"5,120,test\n" +
-						"6,210,test\n";
-			}
-			case 3: {
-				
-				/*
-				 * check correctness of cogroup if UDF returns left input objects
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
-				
-				coGroupDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n" +
-						"4,3,Hello world, how are you?\n" +
-						"5,3,I am fine.\n";
-				
-			}
-			case 4: {
-				
-				/*
-				 * check correctness of cogroup if UDF returns right input objects
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
-				
-				coGroupDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,0,Hallo,1\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"2,3,2,Hallo Welt wie,1\n" +
-						"3,4,3,Hallo Welt wie gehts?,2\n" +
-						"3,5,4,ABC,2\n" +
-						"3,6,5,BCD,3\n";
-				
-			}
-			case 5: {
-				
-				/*
-				 * Reduce with broadcast set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
-				
-				coGroupDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,55\n" +
-						"2,6,55\n" +
-						"3,24,55\n" +
-						"4,60,55\n" +
-						"5,120,55\n";
-			}
-			case 6: {
-				
-				/*
-				 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).with(new MixedCoGroup());
-				
-				coGroupDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "0,1,test\n" +
-						"1,2,test\n" +
-						"2,5,test\n" +
-						"3,15,test\n" +
-						"4,33,test\n" +
-						"5,63,test\n" +
-						"6,109,test\n" +
-						"7,4,test\n" + 
-						"8,4,test\n" + 
-						"9,4,test\n" + 
-						"10,5,test\n" + 
-						"11,5,test\n" + 
-						"12,5,test\n" + 
-						"13,5,test\n" +
-						"14,5,test\n"; 
-						
-			}
-			case 7: {
-				
-				/*
-				 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).equalTo(2).with(new MixedCoGroup2());
-				
-				coGroupDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "0,1,test\n" +
-						"1,2,test\n" +
-						"2,5,test\n" +
-						"3,15,test\n" +
-						"4,33,test\n" +
-						"5,63,test\n" +
-						"6,109,test\n" +
-						"7,4,test\n" + 
-						"8,4,test\n" + 
-						"9,4,test\n" + 
-						"10,5,test\n" + 
-						"11,5,test\n" + 
-						"12,5,test\n" + 
-						"13,5,test\n" +
-						"14,5,test\n"; 
-				
-			}
-			case 8: {
-				/*
-				 * CoGroup with multiple key fields
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-						where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
-				
-				coGrouped.writeAsCsv(resultPath);
-				env.execute();
-				
-				return "1,1,Hallo\n" +
-						"2,2,Hallo Welt\n" +
-						"3,2,Hallo Welt wie gehts?\n" +
-						"3,2,ABC\n" +
-						"5,3,HIJ\n" +
-						"5,3,IJK\n";
-			}
-			case 9: {
-				/*
-				 * CoGroup with multiple key fields
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-						where(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-								return new Tuple2<Integer, Long>(t.f0, t.f4);
-							}
-						}).
-						equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
-							private static final long serialVersionUID = 1L;
-							
-							@Override
-							public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
-								return new Tuple2<Integer, Long>(t.f0, t.f1);
-							}
-						}).with(new Tuple5Tuple3CoGroup());
-				
-				coGrouped.writeAsCsv(resultPath);
-				env.execute();
-				
-				return "1,1,Hallo\n" +
-						"2,2,Hallo Welt\n" +
-						"3,2,Hallo Welt wie gehts?\n" +
-						"3,2,ABC\n" +
-						"5,3,HIJ\n" +
-						"5,3,IJK\n";
-			}
-			case 10: {
-				/*
-				 * CoGroup on two custom type inputs using expression keys
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
-				
-				coGroupDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,test\n" +
-						"2,6,test\n" +
-						"3,24,test\n" +
-						"4,60,test\n" +
-						"5,120,test\n" +
-						"6,210,test\n";
-			}
-			case 11: {
-				/*
-				 * CoGroup on two custom type inputs using expression keys
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-						.where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
-						private static final long serialVersionUID = 1L;
-
-						@Override
-						public void coGroup(
-								Iterable<POJO> first,
-								Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-								Collector<CustomType> out) throws Exception {
-							for(POJO p : first) {
-								for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-									Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-									out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-								}
-							}
-						}
-				});
-				coGroupDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"-1,20000,Flink\n" +
-						"-1,10000,Flink\n" +
-						"-1,30000,Flink\n";
-			}
-			case 12: {
-				/*
-				 * CoGroup field-selector (expression keys) + key selector function
-				 * The key selector is unnecessary complicated (Tuple1) ;)
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-						.where(new KeySelector<POJO, Tuple1<Long>>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public Tuple1<Long> getKey(POJO value)
-									throws Exception {
-								return new Tuple1<Long>(value.nestedPojo.longNumber);
-							}
-						}).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
-							private static final long serialVersionUID = 1L;
-
-						@Override
-						public void coGroup(
-								Iterable<POJO> first,
-								Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-								Collector<CustomType> out) throws Exception {
-							for(POJO p : first) {
-								for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-									Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-									out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-								}
-							}
-						}
-				});
-				coGroupDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"-1,20000,Flink\n" +
-						"-1,10000,Flink\n" +
-						"-1,30000,Flink\n";
+
+	@Test
+	public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
+		/*
+		 * check correctness of cogroup if UDF returns left input objects
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
+
+		coGroupDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n";
+	}
+
+	@Test
+	public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
+		/*
+		 * check correctness of cogroup if UDF returns right input objects
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
+
+		coGroupDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,0,Hallo,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"3,4,3,Hallo Welt wie gehts?,2\n" +
+				"3,5,4,ABC,2\n" +
+				"3,6,5,BCD,3\n";
+	}
+
+	@Test
+	public void testCoGroupWithBroadcastSet() throws Exception {
+		/*
+		 * Reduce with broadcast set
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
+
+		coGroupDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,0,55\n" +
+				"2,6,55\n" +
+				"3,24,55\n" +
+				"4,60,55\n" +
+				"5,120,55\n";
+	}
+
+	@Test
+	public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+	throws Exception {
+		/*
+		 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new
+				KeySelector2()).with(new MixedCoGroup());
+
+		coGroupDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "0,1,test\n" +
+				"1,2,test\n" +
+				"2,5,test\n" +
+				"3,15,test\n" +
+				"4,33,test\n" +
+				"5,63,test\n" +
+				"6,109,test\n" +
+				"7,4,test\n" +
+				"8,4,test\n" +
+				"9,4,test\n" +
+				"10,5,test\n" +
+				"11,5,test\n" +
+				"12,5,test\n" +
+				"13,5,test\n" +
+				"14,5,test\n";
+	}
+
+	public static class KeySelector2 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	@Test
+	public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+			throws Exception {
+		/*
+		 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with
+				(new MixedCoGroup2());
+
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "0,1,test\n" +
+				"1,2,test\n" +
+				"2,5,test\n" +
+				"3,15,test\n" +
+				"4,33,test\n" +
+				"5,63,test\n" +
+				"6,109,test\n" +
+				"7,4,test\n" +
+				"8,4,test\n" +
+				"9,4,test\n" +
+				"10,5,test\n" +
+				"11,5,test\n" +
+				"12,5,test\n" +
+				"13,5,test\n" +
+				"14,5,test\n";
+
+	}
+
+	public static class KeySelector3 implements KeySelector<CustomType, Integer> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Integer getKey(CustomType in) {
+			return in.myInt;
+		}
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
+		/*
+		 * CoGroup with multiple key fields
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+				where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+
+		coGrouped.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception {
+		/*
+		 * CoGroup with multiple key fields
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+				where(new KeySelector7()).
+				equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup());
+
+		coGrouped.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+	}
+
+	public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
+	Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f4);
+		}
+	}
+
+	public static class KeySelector8 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+			return new Tuple2<Integer, Long>(t.f0, t.f1);
+		}
+	}
+
+	@Test
+	public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
+		/*
+		 * CoGroup on two custom type inputs using expression keys
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "1,0,test\n" +
+				"2,6,test\n" +
+				"3,24,test\n" +
+				"4,60,test\n" +
+				"5,120,test\n" +
+				"6,210,test\n";
+	}
+
+	@Test
+	public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
+			Exception {
+		/*
+		 * CoGroup on two custom type inputs using expression keys
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+				.where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = 	"-1,20000,Flink\n" +
+				"-1,10000,Flink\n" +
+				"-1,30000,Flink\n";
+	}
+
+	public static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<POJO> first,
+				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+				Collector<CustomType> out) throws Exception {
+			for(POJO p : first) {
+				for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+				}
 			}
-			case 13: {
-				/*
-				 * CoGroup field-selector (expression keys) + key selector function
-				 * The key selector is simple here
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-				DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-						.where(new KeySelector<POJO, Long>() {
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public Long getKey(POJO value)
-									throws Exception {
-								return value.nestedPojo.longNumber;
-							}
-						}).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
-							private static final long serialVersionUID = 1L;
-
-						@Override
-						public void coGroup(
-								Iterable<POJO> first,
-								Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-								Collector<CustomType> out) throws Exception {
-							for(POJO p : first) {
-								for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-									Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-									out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-								}
-							}
-						}
-				});
-				coGroupDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"-1,20000,Flink\n" +
-						"-1,10000,Flink\n" +
-						"-1,30000,Flink\n";
+		}
+	}
+
+	@Test
+	public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
+		/*
+		 * CoGroup field-selector (expression keys) + key selector function
+		 * The key selector is unnecessary complicated (Tuple1) ;)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+				.where(new KeySelector6()).equalTo(6).with(new CoGroup3());
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = 	"-1,20000,Flink\n" +
+				"-1,10000,Flink\n" +
+				"-1,30000,Flink\n";
+
+	}
+
+	public static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple1<Long> getKey(POJO value)
+		throws Exception {
+			return new Tuple1<Long>(value.nestedPojo.longNumber);
+		}
+	}
+
+	public static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer,
+			String, Integer, Integer, Long, String, Long>, CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<POJO> first,
+				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+				Collector<CustomType> out) throws Exception {
+			for(POJO p : first) {
+				for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+				}
 			}
-			
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+		}
+	}
+
+	@Test
+	public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
+		/*
+		 * CoGroup field-selector (expression keys) + key selector function
+		 * The key selector is simple here
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+				.where(new KeySelector1()).equalTo(6).with(new CoGroup2());
+		coGroupDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "-1,20000,Flink\n" +
+				"-1,10000,Flink\n" +
+				"-1,30000,Flink\n";
+	}
+
+	public static class KeySelector1 implements KeySelector<POJO, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long getKey(POJO value)
+		throws Exception {
+			return value.nestedPojo.longNumber;
+		}
+	}
+
+	public static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String,
+			Integer, Integer, Long, String, Long>, CustomType> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(
+				Iterable<POJO> first,
+				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+				Collector<CustomType> out) throws Exception {
+			for(POJO p : first) {
+				for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+				}
 			}
-			
 		}
-	
 	}
-	
+
 	public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index 7d79ea5..bd32bfc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RichCrossFunction;
@@ -32,371 +29,336 @@ import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
-public class CrossITCase extends JavaProgramTestBase {
-	
-	private static int NUM_PROGRAMS = 11;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
+public class CrossITCase extends MultipleProgramsTestBase {
+
+	public CrossITCase(ExecutionMode mode){
+		super(mode);
+	}
+
 	private String resultPath;
-	private String expectedResult;
-	
-	public CrossITCase(Configuration config) {
-		super(config);
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = CrossProgs.runProgram(curProgId, resultPath);
+	@Test
+	public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
+		/*
+		 * check correctness of cross on two tuple inputs
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "0,HalloHallo\n" +
+				"1,HalloHallo Welt\n" +
+				"2,HalloHallo Welt wie\n" +
+				"1,Hallo WeltHallo\n" +
+				"2,Hallo WeltHallo Welt\n" +
+				"3,Hallo WeltHallo Welt wie\n" +
+				"2,Hallo Welt wieHallo\n" +
+				"3,Hallo Welt wieHallo Welt\n" +
+				"4,Hallo Welt wieHallo Welt wie\n";
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
+
+	@Test
+	public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception {
+		/*
+		 * check correctness of cross if UDF returns left input object
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"1,1,Hi\n" +
+				"1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"2,2,Hello\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"3,2,Hello world\n" +
+				"3,2,Hello world\n";
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception {
+		/*
+		 * check correctness of cross if UDF returns right input object
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "1,1,0,Hallo,1\n" +
+				"1,1,0,Hallo,1\n" +
+				"1,1,0,Hallo,1\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,2,1,Hallo Welt,2\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,3,2,Hallo Welt wie,1\n" +
+				"2,3,2,Hallo Welt wie,1\n";
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
 	}
-	
-	private static class CrossProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				
-				/*
-				 * check correctness of cross on two tuple inputs 
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "0,HalloHallo\n" +
-						"1,HalloHallo Welt\n" +
-						"2,HalloHallo Welt wie\n" +
-						"1,Hallo WeltHallo\n" +
-						"2,Hallo WeltHallo Welt\n" +
-						"3,Hallo WeltHallo Welt wie\n" +
-						"2,Hallo Welt wieHallo\n" +
-						"3,Hallo Welt wieHallo Welt\n" +
-						"4,Hallo Welt wieHallo Welt wie\n";
-			}
-			case 2: {
-				
-				/*
-				 * check correctness of cross if UDF returns left input object
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"1,1,Hi\n" +
-						"1,1,Hi\n" +
-						"2,2,Hello\n" +
-						"2,2,Hello\n" +
-						"2,2,Hello\n" +
-						"3,2,Hello world\n" +
-						"3,2,Hello world\n" +
-						"3,2,Hello world\n";
-				
-			}
-			case 3: {
-				
-				/*
-				 * check correctness of cross if UDF returns right input object
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,0,Hallo,1\n" +
-						"1,1,0,Hallo,1\n" +
-						"1,1,0,Hallo,1\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"2,2,1,Hallo Welt,2\n" +
-						"2,3,2,Hallo Welt wie,1\n" +
-						"2,3,2,Hallo Welt wie,1\n" +
-						"2,3,2,Hallo Welt wie,1\n";
-				
-			}
-			case 4: {
-				
-				/*
-				 * check correctness of cross with broadcast set
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "2,0,55\n" +
-						"3,0,55\n" +
-						"3,0,55\n" +
-						"3,0,55\n" +
-						"4,1,55\n" +
-						"4,2,55\n" +
-						"3,0,55\n" +
-						"4,2,55\n" +
-						"4,4,55\n";
-			}
-			case 5: {
-				
-				/*
-				 * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "0,HalloHallo\n" +
-						"1,HalloHallo Welt\n" +
-						"2,HalloHallo Welt wie\n" +
-						"1,Hallo WeltHallo\n" +
-						"2,Hallo WeltHallo Welt\n" +
-						"3,Hallo WeltHallo Welt wie\n" +
-						"2,Hallo Welt wieHallo\n" +
-						"3,Hallo Welt wieHallo Welt\n" +
-						"4,Hallo Welt wieHallo Welt wie\n";
-				
-			}
-			case 6: {
-				
-				/*
-				 * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "0,HalloHallo\n" +
-						"1,HalloHallo Welt\n" +
-						"2,HalloHallo Welt wie\n" +
-						"1,Hallo WeltHallo\n" +
-						"2,Hallo WeltHallo Welt\n" +
-						"3,Hallo WeltHallo Welt wie\n" +
-						"2,Hallo Welt wieHallo\n" +
-						"3,Hallo Welt wieHallo Welt\n" +
-						"4,Hallo Welt wieHallo Welt wie\n";
-				
-			}
-			case 7: {
 
-			/*
-			 * project cross on a tuple input 1
-			 */
+	@Test
+	public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
+		/*
+		 * check correctness of cross with broadcast set
+		 */
 
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
-					.projectFirst(2, 1)
-					.projectSecond(3)
-					.projectFirst(0)
-					.projectSecond(4,1);
+		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
 
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
 
-				// return expected result
-				return "Hi,1,Hallo,1,1,1\n" +
-					"Hi,1,Hallo Welt,1,2,2\n" +
-					"Hi,1,Hallo Welt wie,1,1,3\n" +
-					"Hello,2,Hallo,2,1,1\n" +
-					"Hello,2,Hallo Welt,2,2,2\n" +
-					"Hello,2,Hallo Welt wie,2,1,3\n" +
-					"Hello world,2,Hallo,3,1,1\n" +
-					"Hello world,2,Hallo Welt,3,2,2\n" +
-					"Hello world,2,Hallo Welt wie,3,1,3\n";
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
 
-			}
-			case 8: {
+		expected = "2,0,55\n" +
+				"3,0,55\n" +
+				"3,0,55\n" +
+				"3,0,55\n" +
+				"4,1,55\n" +
+				"4,2,55\n" +
+				"3,0,55\n" +
+				"4,2,55\n" +
+				"4,4,55\n";
+	}
+
+	@Test
+	public void testCorrectnessOfCrossWithHuge() throws Exception {
+		/*
+		 * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "0,HalloHallo\n" +
+				"1,HalloHallo Welt\n" +
+				"2,HalloHallo Welt wie\n" +
+				"1,Hallo WeltHallo\n" +
+				"2,Hallo WeltHallo Welt\n" +
+				"3,Hallo WeltHallo Welt wie\n" +
+				"2,Hallo Welt wieHallo\n" +
+				"3,Hallo Welt wieHallo Welt\n" +
+				"4,Hallo Welt wieHallo Welt wie\n";
+	}
+
+	@Test
+	public void testCorrectnessOfCrossWithTiny() throws Exception {
+		/*
+		 * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "0,HalloHallo\n" +
+				"1,HalloHallo Welt\n" +
+				"2,HalloHallo Welt wie\n" +
+				"1,Hallo WeltHallo\n" +
+				"2,Hallo WeltHallo Welt\n" +
+				"3,Hallo WeltHallo Welt wie\n" +
+				"2,Hallo Welt wieHallo\n" +
+				"3,Hallo Welt wieHallo Welt\n" +
+				"4,Hallo Welt wieHallo Welt wie\n";
+	}
 
-			/*
-			 * project cross on a tuple input 2
-			 */
+	@Test
+	public void testProjectCrossOnATupleInput1() throws Exception{
+		/*
+		 * project cross on a tuple input 1
+		 */
 
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-					DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-					DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2)
-						.projectSecond(3)
-						.projectFirst(2, 1)
-						.projectSecond(4,1)
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
+				.projectFirst(2, 1)
+				.projectSecond(3)
+				.projectFirst(0)
+					.projectSecond(4,1);
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "Hi,1,Hallo,1,1,1\n" +
+				"Hi,1,Hallo Welt,1,2,2\n" +
+				"Hi,1,Hallo Welt wie,1,1,3\n" +
+				"Hello,2,Hallo,2,1,1\n" +
+				"Hello,2,Hallo Welt,2,2,2\n" +
+				"Hello,2,Hallo Welt wie,2,1,3\n" +
+				"Hello world,2,Hallo,3,1,1\n" +
+				"Hello world,2,Hallo Welt,3,2,2\n" +
+				"Hello world,2,Hallo Welt wie,3,1,3\n";
+	}
+
+	@Test
+	public void testProjectCrossOnATupleInput2() throws Exception {
+		/*
+		 * project cross on a tuple input 2
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2)
+				.projectSecond(3)
+				.projectFirst(2, 1)
+				.projectSecond(4,1)
 						.projectFirst(0);
 
-					crossDs.writeAsCsv(resultPath);
-					env.execute();
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
 
-					// return expected result
-					return "Hallo,Hi,1,1,1,1\n" +
-						"Hallo Welt,Hi,1,2,2,1\n" +
-						"Hallo Welt wie,Hi,1,1,3,1\n" +
-						"Hallo,Hello,2,1,1,2\n" +
-						"Hallo Welt,Hello,2,2,2,2\n" +
-						"Hallo Welt wie,Hello,2,1,3,2\n" +
-						"Hallo,Hello world,2,1,1,3\n" +
-						"Hallo Welt,Hello world,2,2,2,3\n" +
-						"Hallo Welt wie,Hello world,2,1,3,3\n";
+		expected = "Hallo,Hi,1,1,1,1\n" +
+				"Hallo Welt,Hi,1,2,2,1\n" +
+				"Hallo Welt wie,Hi,1,1,3,1\n" +
+				"Hallo,Hello,2,1,1,2\n" +
+				"Hallo Welt,Hello,2,2,2,2\n" +
+				"Hallo Welt wie,Hello,2,1,3,2\n" +
+				"Hallo,Hello world,2,1,1,3\n" +
+				"Hallo Welt,Hello world,2,2,2,3\n" +
+				"Hallo Welt wie,Hello world,2,1,3,3\n";
 
-			}
-			case 9: {
-				/*
-				 * check correctness of default cross
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
-						"(1,1,Hi),(1,1,0,Hallo,1)\n" +
-						"(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
-						"(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
-						"(2,2,Hello),(1,1,0,Hallo,1)\n" +
-						"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
-						"(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
-						"(3,2,Hello world),(1,1,0,Hallo,1)\n" +
-						"(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
-				
-			}
+	}
 
-			case 10: {
-				
-				/*
-				 * check correctness of cross on two custom type inputs
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-				DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
-				
-				crossDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,HiHi\n"
-						+ "2,1,HiHello\n"
-						+ "2,2,HiHello world\n"
-						+ "2,1,HelloHi\n"
-						+ "4,2,HelloHello\n"
-						+ "4,3,HelloHello world\n"
-						+ "2,2,Hello worldHi\n"
-						+ "4,3,Hello worldHello\n"
-						+ "4,4,Hello worldHello world";
-			}
-			
-			case 11: {
-				
-				/*
-				 * check correctness of cross a tuple input and a custom type input
-				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
-				
-				crossDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "2,0,HalloHi\n" +
-						"3,0,HalloHello\n" +
-						"3,0,HalloHello world\n" +
-						"3,0,Hallo WeltHi\n" +
-						"4,1,Hallo WeltHello\n" +
-						"4,2,Hallo WeltHello world\n" +
-						"3,0,Hallo Welt wieHi\n" +
-						"4,2,Hallo Welt wieHello\n" +
-						"4,4,Hallo Welt wieHello world\n";
-				
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-	
+	@Test
+	public void testCorrectnessOfDefaultCross() throws Exception {
+		/*
+		 * check correctness of default cross
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+				"(1,1,Hi),(1,1,0,Hallo,1)\n" +
+				"(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
+				"(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
+				"(2,2,Hello),(1,1,0,Hallo,1)\n" +
+				"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+				"(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
+				"(3,2,Hello world),(1,1,0,Hallo,1)\n" +
+				"(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+	}
+
+	@Test
+	public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception {
+		/*
+		 * check correctness of cross on two custom type inputs
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
+
+		crossDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "1,0,HiHi\n"
+				+ "2,1,HiHello\n"
+				+ "2,2,HiHello world\n"
+				+ "2,1,HelloHi\n"
+				+ "4,2,HelloHello\n"
+				+ "4,3,HelloHello world\n"
+				+ "2,2,Hello worldHi\n"
+				+ "4,3,Hello worldHello\n"
+				+ "4,4,Hello worldHello world";
+	}
+
+	@Test
+	public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception {
+		/*
+		 * check correctness of cross a tuple input and a custom type input
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
+
+		crossDs.writeAsCsv(resultPath);
+		env.execute();
+
+		expected = "2,0,HalloHi\n" +
+				"3,0,HalloHello\n" +
+				"3,0,HalloHello world\n" +
+				"3,0,Hallo WeltHi\n" +
+				"4,1,Hallo WeltHello\n" +
+				"4,2,Hallo WeltHello world\n" +
+				"3,0,Hallo Welt wieHi\n" +
+				"4,2,Hallo Welt wieHello\n" +
+				"4,4,Hallo Welt wieHello world\n";
 	}
 	
 	public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {


Mime
View raw message