flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/6] flink git commit: [FLINK-2275] [tests] Aigrated test from execute() to collect()
Date Wed, 01 Jul 2015 15:56:06 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index d52055d..260de1c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -41,21 +41,19 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoContai
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.joda.time.DateTime;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import scala.math.BigInt;
 
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "unchecked", "UnusedDeclaration"})
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends MultipleProgramsTestBase {
 
@@ -63,24 +61,6 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		if(expected != null) {
-			compareResultsByLinesInMemory(expected, resultPath);
-		}
-	}
-
 	@Test
 	public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception{
 		/*
@@ -93,20 +73,20 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Integer, Long>> reduceDs = ds.
 				groupBy(1).reduceGroup(new Tuple3GroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, Long>> result = reduceDs.collect();
 
-		expected = "1,1\n" +
+		String expected = "1,1\n" +
 				"5,2\n" +
 				"15,3\n" +
 				"34,4\n" +
 				"65,5\n" +
 				"111,6\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
-	public void testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelectors() throws
-			Exception {
+	public void testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelectors() throws Exception {
 		/*
 		 * check correctness of groupReduce on tuples with multiple key field selector
 		 */
@@ -117,10 +97,9 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
 				groupBy(4, 0).reduceGroup(new Tuple5GroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs.collect();
 
-		expected = "1,1,0,P-),1\n" +
+		String expected = "1,1,0,P-),1\n" +
 				"2,3,0,P-),1\n" +
 				"2,2,0,P-),2\n" +
 				"3,9,0,P-),2\n" +
@@ -130,11 +109,12 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				"5,11,0,P-),1\n" +
 				"5,29,0,P-),2\n" +
 				"5,25,0,P-),3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
-	public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting() throws
-			Exception {
+	public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting() throws Exception {
 		/*
 		 * check correctness of groupReduce on tuples with key field selector and group sorting
 		 */
@@ -146,15 +126,17 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).sortGroup(2, Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n"
+				+
 				"5,2,Hello-Hello world\n" +
 				"15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" +
 				"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
 				"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
 				"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -169,15 +151,16 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Integer, Long>> reduceDs = ds.
 				groupBy(new KeySelector1()).reduceGroup(new Tuple3GroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, Long>> result = reduceDs.collect();
 
-		expected = "1,1\n" +
+		String expected = "1,1\n" +
 				"5,2\n" +
 				"15,3\n" +
 				"34,4\n" +
 				"65,5\n" +
 				"111,6\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector1 implements KeySelector<Tuple3<Integer, Long, String>, Long> {
@@ -201,15 +184,16 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> reduceDs = ds.
 				groupBy(new KeySelector2()).reduceGroup(new CustomTypeGroupReduce());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = reduceDs.collect();
 
-		expected = "1,0,Hello!\n" +
+		String expected = "1,0,Hello!\n" +
 				"2,3,Hello!\n" +
 				"3,12,Hello!\n" +
 				"4,30,Hello!\n" +
 				"5,60,Hello!\n" +
 				"6,105,Hello!\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector2 implements KeySelector<CustomType, Integer> {
@@ -232,10 +216,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
+
+		String expected = "231,91,Hello World\n";
 
-		expected = "231,91,Hello World\n";
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -249,10 +234,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
 		DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = reduceDs.collect();
 
-		expected = "91,210,Hello!";
+		String expected = "91,210,Hello!";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -269,21 +255,20 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,55\n" +
+		String expected = "1,1,55\n" +
 				"5,2,55\n" +
 				"15,3,55\n" +
 				"34,4,55\n" +
 				"65,5,55\n" +
 				"111,6,55\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
-	public void
-	testCorrectnessOfGroupReduceIfUDFReturnsInputObjectsMultipleTimesWhileChangingThem() throws
-			Exception{
+	public void testCorrectnessOfGroupReduceIfUDFReturnsInputObjectsMultipleTimesWhileChangingThem() throws Exception{
 		/*
 		 * check correctness of groupReduce if UDF returns input objects multiple times and changes it in between
 		 */
@@ -294,20 +279,20 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "11,1,Hi!\n" +
+		String expected = "11,1,Hi!\n" +
 				"21,1,Hi again!\n" +
 				"12,2,Hi!\n" +
 				"22,2,Hi again!\n" +
 				"13,2,Hi!\n" +
 				"23,2,Hi again!\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
-	public void testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine()
-			throws Exception {
+	public void testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine() throws Exception {
 		/*
 		 * check correctness of groupReduce on custom type with key extractor and combine
 		 */
@@ -319,15 +304,16 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> reduceDs = ds.
 				groupBy(new KeySelector3()).reduceGroup(new CustomTypeGroupReduceWithCombine());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = reduceDs.collect();
 
-		expected = "1,0,test1\n" +
+		String expected = "1,0,test1\n" +
 				"2,3,test2\n" +
 				"3,12,test3\n" +
 				"4,30,test4\n" +
 				"5,60,test5\n" +
 				"6,105,test6\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector3 implements KeySelector<CustomType, Integer> {
@@ -341,7 +327,6 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testCorrectnessOfGroupReduceOnTuplesWithCombine() throws Exception {
-		
 		/*
 		 * check correctness of groupReduce on tuples with combine
 		 */
@@ -354,20 +339,20 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Integer, String>> reduceDs = ds.
 				groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, String>> result = reduceDs.collect();
 
-		expected = "1,test1\n" +
+		String expected = "1,test1\n" +
 				"5,test2\n" +
 				"15,test3\n" +
 				"34,test4\n" +
 				"65,test5\n" +
 				"111,test6\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
 	public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() throws Exception {
-		
 		/*
 		 * check correctness of all-groupreduce for tuples with combine
 		 */
@@ -383,11 +368,12 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
 				.withParameters(cfg);
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, String>> result = reduceDs.collect();
 
-		expected = "322," +
+		String expected = "322," +
 				"testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -402,20 +388,21 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).sortGroup(2, Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n"
+				+
 				"5,2,Hello world-Hello\n" +
 				"15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
 				"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
 				"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
 				"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
-	public void testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector() throws
-			Exception {
+	public void testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector() throws Exception {
 		/*
 		 * check correctness of groupReduce on tuples with tuple-returning key selector
 		 */
@@ -426,10 +413,9 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
 				groupBy(new KeySelector4()).reduceGroup(new Tuple5GroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs.collect();
 
-		expected = "1,1,0,P-),1\n" +
+		String expected = "1,1,0,P-),1\n" +
 				"2,3,0,P-),1\n" +
 				"2,2,0,P-),2\n" +
 				"3,9,0,P-),2\n" +
@@ -439,6 +425,8 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				"5,11,0,P-),1\n" +
 				"5,29,0,P-),2\n" +
 				"5,25,0,P-),3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector4 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
@@ -451,11 +439,10 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting() throws
-			Exception {
+	public void testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting() throws Exception {
 		/*
 		 * check that input of combiner is also sorted for combinable groupReduce with group sorting
-	 	 */
+		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
@@ -464,15 +451,16 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).sortGroup(0, Order.ASCENDING).reduceGroup(new OrderCheckingCombinableReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"2,2,Hello\n" +
 				"4,3,Hello world, how are you?\n" +
 				"7,4,Comment#1\n" +
 				"11,5,Comment#5\n" +
 				"16,6,Comment#10\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -487,10 +475,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<String, Integer>> reduceDs = ds.groupBy("nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal")
 				.reduceGroup(new GroupReducer1());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, Integer>> result = reduceDs.collect();
+
+		String expected = "aa,1\nbb,2\ncc,3\n";
 
-		expected = "aa,1\nbb,2\ncc,3\n";
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class GroupReducer1 implements GroupReduceFunction<CollectionDataSets.CrazyNested, Tuple2<String, Integer>> {
@@ -499,7 +488,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		@Override
 		public void reduce(Iterable<CrazyNested> values,
 				Collector<Tuple2<String, Integer>> out)
-		throws Exception {
+						throws Exception {
 			int c = 0; String n = null;
 			for(CrazyNested v : values) {
 				c++; // haha
@@ -520,10 +509,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Integer> reduceDs = ds.groupBy("special", "f2")
 				.reduceGroup(new GroupReducer2());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = reduceDs.collect();
 
-		expected = "3\n2\n";
+		String expected = "3\n2\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class GroupReducer2 implements GroupReduceFunction<FromTupleWithCTor, Integer> {
@@ -545,14 +535,15 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Integer> reduceDs = ds.groupBy("hadoopFan", "theTuple.*") // full tuple selection
 				.reduceGroup(new GroupReducer3());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = reduceDs.collect();
+
+		String expected = "1\n5\n";
 
-		expected = "1\n5\n";
+		compareResultAsText(result, expected);
 	}
 
 	public static class GroupReducer3 implements GroupReduceFunction<PojoContainingTupleAndWritable, Integer> {
-		
+
 		@Override
 		public void reduce(Iterable<PojoContainingTupleAndWritable> values, Collector<Integer> out) {
 			out.collect(countElements(values));
@@ -571,10 +562,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Integer> reduceDs = ds.groupBy("f0", "f1.*") // nested full tuple selection
 				.reduceGroup(new GroupReducer4());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = reduceDs.collect();
+
+		String expected = "3\n1\n";
 
-		expected = "3\n1\n";
+		compareResultAsText(result, expected);
 	}
 
 	public static class GroupReducer4 implements GroupReduceFunction<Tuple3<Integer,CrazyNested, POJO>, Integer> {
@@ -598,15 +590,17 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
 				groupBy(1).sortGroup("f2", Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n"
+				+
 				"5,2,Hello world-Hello\n" +
 				"15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
 				"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
 				"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
 				"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -619,12 +613,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 
-		expected = "a--(2,1)-(1,3)-(1,2)-\n" +
+		String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
 				"b--(2,2)-\n"+
 				"c--(4,9)-(3,6)-(3,3)-\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -641,12 +636,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				.sortGroup("f0.f0", Order.ASCENDING)
 				.sortGroup("f0.f1", Order.ASCENDING)
 				.reduceGroup(new NestedTupleReducer());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 
-		expected = "a--(1,2)-(1,3)-(2,1)-\n" +
+		String expected = "a--(1,2)-(1,3)-(2,1)-\n" +
 				"b--(2,2)-\n"+
 				"c--(3,3)-(3,6)-(4,9)-\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -660,12 +656,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		// f0.f0 is first integer
 		DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 
-		expected = "a--(2,1)-(1,3)-(1,2)-\n" +
+		String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
 				"b--(2,2)-\n"+
 				"c--(4,9)-(3,3)-(3,6)-\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -679,12 +676,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		// f0.f0 is first integer
 		DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 
-		expected = "a--(2,1)-(1,3)-(1,2)-\n" +
+		String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
 				"b--(2,2)-\n"+
 				"c--(4,9)-(3,6)-(3,3)-\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -699,11 +697,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		// f0.f0 is first integer
 		DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
 				.reduceGroup(new GroupReducer5());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 
-		expected = "1---(10,100)-\n" +
+		String expected = "1---(10,100)-\n"
+				+
 				"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -721,16 +721,17 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				.sortGroup(new StringFieldExtractor<Tuple3<Integer, Long, String>>(2), Order.DESCENDING)
 				.reduceGroup(new Tuple3SortedGroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
 
-		// return expected result
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n"
+				+
 				"5,2,Hello world-Hello\n" +
 				"15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
 				"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
 				"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
 				"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class TwoTuplePojoExtractor implements KeySelector<CustomType, Tuple2<Integer, Integer>> {
@@ -765,22 +766,24 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				.sortGroup(new StringPojoExtractor(), Order.DESCENDING)
 				.reduceGroup(new CustomTypeSortedGroupReduce());
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = reduceDs.collect();
 
-		// return expected result
-		expected = "1,0,Hi\n" +
+		String expected = "1,0,Hi\n"
+				+
 				"2,3,Hello world-Hello\n" +
 				"3,12,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
 				"4,30,Comment#4-Comment#3-Comment#2-Comment#1\n" +
 				"5,60,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
 				"6,105,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class LongFieldExtractor<T extends Tuple>  implements KeySelector<T, Long> {
 		private static final long serialVersionUID = 1L;
 		private int field;
 
+		
 		public LongFieldExtractor() { }
 
 		public LongFieldExtractor(int field) {
@@ -821,7 +824,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public String getKey(T t) throws Exception {
-			return ((Tuple)t).getField(field);
+			return t.getField(field);
 		}
 	}
 
@@ -840,19 +843,18 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				.sortGroup(new StringFieldExtractor<Tuple3<Integer, Long, String>>(2), Order.DESCENDING)
 				.reduceGroup(new Tuple3SortedGroupReduceWithCombine());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, String>> result = reduceDs.collect();
 
-		// return expected result
-		if (super.mode == TestExecutionMode.COLLECTION) {
-			expected = null;
-		} else {
-			expected = "1,Hi\n" +
+		if (super.mode != TestExecutionMode.COLLECTION) {
+			String expected = "1,Hi\n"
+					+
 					"5,Hello world-Hello\n" +
 					"15,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
 					"34,Comment#4-Comment#3-Comment#2-Comment#1\n" +
 					"65,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
 					"111,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+			compareResultAsTuples(result, expected);
 		}
 	}
 
@@ -880,15 +882,16 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				.sortGroup(new FiveToTwoTupleExtractor(), Order.DESCENDING)
 				.reduceGroup(new Tuple5SortedGroupReduce());
 
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs.collect();
 
-		// return expected result
-		expected = "1,1,0,Hallo,1\n" +
+		String expected = "1,1,0,Hallo,1\n"
+				+
 				"2,5,0,Hallo Welt-Hallo Welt wie,1\n" +
 				"3,15,0,BCD-ABC-Hallo Welt wie gehts?,2\n" +
 				"4,34,0,FGH-CDE-EFG-DEF,1\n" +
 				"5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class GroupReducer5 implements GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String> {
@@ -924,10 +927,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		// f0.f0 is first integer
 		DataSet<String> reduceDs = ds.groupBy("p2.a2")
 				.reduceGroup(new GroupReducer6());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
+
+		String expected = "b\nccc\nee\n";
 
-		expected = "b\nccc\nee\n";
+		compareResultAsText(result, expected);
 	}
 
 	public static class GroupReducer6 implements GroupReduceFunction<CollectionDataSets.PojoWithMultiplePojos, String> {
@@ -955,23 +959,24 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		// f0.f0 is first integer
 		DataSet<String> reduceDs = ds.groupBy("key")
 				.reduceGroup(new GroupReducer7());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 
-		expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+		String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class GroupReducer7 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
+		
 		@Override
-		public void reduce(
-				Iterable<CollectionDataSets.PojoWithCollection> values,
-				Collector<String> out) throws Exception {
+		public void reduce(Iterable<CollectionDataSets.PojoWithCollection> values, Collector<String> out) {
 			StringBuilder concat = new StringBuilder();
 			concat.append("call");
-			for(CollectionDataSets.PojoWithCollection value : values) {
-				concat.append("For key "+value.key+" we got: ");
-				for(CollectionDataSets.Pojo1 p :value.pojos) {
-					concat.append("pojo.a="+p.a);
+			for (CollectionDataSets.PojoWithCollection value : values) {
+				concat.append("For key ").append(value.key).append(" we got: ");
+				
+				for (CollectionDataSets.Pojo1 p :value.pojos) {
+					concat.append("pojo.a=").append(p.a);
 				}
 			}
 			out.collect(concat.toString());
@@ -991,20 +996,22 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		// f0.f0 is first integer
 		DataSet<String> reduceDs = ds.groupBy("bigInt")
 				.reduceGroup(new GroupReducer8());
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = reduceDs.collect();
 		ExecutionConfig ec = env.getConfig();
 
 		// check if automatic type registration with Kryo worked
 		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(BigInt.class));
 		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(java.sql.Date.class));
 
+		String expected = null;
 
-		expected = "call\n" +
+		String localExpected = "[call\n" +
 				"For key 92233720368547758070 we got:\n" +
 				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
 				"For key 92233720368547758070 we got:\n" +
-				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
+				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}]";
+
+		Assert.assertEquals(localExpected, result.toString());
 	}
 
 	@Test
@@ -1035,34 +1042,33 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 								}
 							}
 						})
-				// add forward field information
-				.withForwardedFields("0")
-				// group again and reduce
-				.groupBy(0).reduceGroup(
-						new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
-							@Override
-							public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
-								int k = 0;
-								long s = 0;
-								for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
-									k = v.f0;
-									s += v.f1;
+						// add forward field information
+						.withForwardedFields("0")
+						// group again and reduce
+						.groupBy(0).reduceGroup(
+								new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+									@Override
+									public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
+										int k = 0;
+										long s = 0;
+										for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+											k = v.f0;
+											s += v.f1;
+										}
+										out.collect(new Tuple2<Integer, Long>(k, s));
+									}
 								}
-								out.collect(new Tuple2<Integer, Long>(k, s));
-							}
-						}
-				);
+								);
 
-		reduceDs.writeAsCsv(resultPath);
+		List<Tuple2<Integer, Long>> result = reduceDs.collect();
 
-		env.execute();
-
-		expected = "1,1\n" +
+		String expected = "1,1\n" +
 				"2,5\n" +
 				"3,15\n" +
 				"4,34\n" +
 				"5,65\n";
 
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -1076,13 +1082,14 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			}
 		});
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = reduceDs.collect();
+
+		String expected = "1\n" +
+				"2\n" +
+				"3\n" +
+				"4";
 
-		expected = "1\n" +
-			"2\n" +
-			"3\n" +
-			"4";
+		compareResultAsText(result, expected);
 	}
 
 	/**
@@ -1096,10 +1103,11 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new Tuple2<Integer, DateTime>(1, DateTime.now()));
 		DataSet<Tuple2<Integer, DateTime>> reduceDs = ds.groupBy("f1").sum(0).project(0);
 
-		reduceDs.writeAsText(resultPath);
-		env.execute();
+		List<Tuple2<Integer, DateTime>> result = reduceDs.collect();
+
+		String expected = "1\n";
 
-		expected = "(1)\n";
+		compareResultAsTuples(result, expected);
 	}
 
 	/**
@@ -1125,23 +1133,21 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			}
 		});
 
-		r.writeAsText(resultPath);
-		env.execute();
+		List<String> result = r.collect();
 
-		expected = "0\n1\n2\n";
+		String expected = "0\n1\n2\n";
+		compareResultAsText(result, expected);
 	}
 
 
 
 	public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
 		@Override
-		public void reduce(
-				Iterable<CollectionDataSets.PojoWithCollection> values,
-				Collector<String> out) throws Exception {
+		public void reduce(Iterable<CollectionDataSets.PojoWithCollection> values, Collector<String> out) {
 			StringBuilder concat = new StringBuilder();
 			concat.append("call");
-			for(CollectionDataSets.PojoWithCollection value : values) {
-				concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
+			for (CollectionDataSets.PojoWithCollection value : values) {
+				concat.append("\nFor key ").append(value.bigInt).append(" we got:\n").append(value);
 			}
 			out.collect(concat.toString());
 		}
@@ -1149,10 +1155,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 	public static class NestedTupleReducer implements GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> {
 		@Override
-		public void reduce(
-				Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values,
-				Collector<String> out)
-				throws Exception {
+		public void reduce(Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values, Collector<String> out) {
 			boolean once = false;
 			StringBuilder concat = new StringBuilder();
 			for(Tuple2<Tuple2<Integer, Integer>, String> value : values) {
@@ -1166,26 +1169,25 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			out.collect(concat.toString());
 		}
 	}
-	
+
 	public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) {
-			
 			int i = 0;
 			long l = 0l;
-			
+
 			for (Tuple3<Integer, Long, String> t : values) {
 				i += t.f0;
 				l = t.f1;
 			}
-			
+
 			out.collect(new Tuple2<Integer, Long>(i, l));
-			
+
 		}
 	}
-	
+
 	public static class Tuple3SortedGroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 
@@ -1195,21 +1197,21 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			int sum = 0;
 			long key = 0;
 			StringBuilder concat = new StringBuilder();
-			
+
 			for (Tuple3<Integer, Long, String> next : values) {
 				sum += next.f0;
 				key = next.f1;
 				concat.append(next.f2).append("-");
 			}
-			
+
 			if (concat.length() > 0) {
 				concat.setLength(concat.length() - 1);
 			}
-			
+
 			out.collect(new Tuple3<Integer, Long, String>(sum, key, concat.toString()));
 		}
 	}
-	
+
 	public static class Tuple5GroupReduce implements GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 		private static final long serialVersionUID = 1L;
 
@@ -1221,13 +1223,13 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			int i = 0;
 			long l = 0l;
 			long l2 = 0l;
-			
+
 			for ( Tuple5<Integer, Long, Integer, String, Long> t : values ) {
 				i = t.f0;
 				l += t.f1;
 				l2 = t.f4;
 			}
-			
+
 			out.collect(new Tuple5<Integer, Long, Integer, String, Long>(i, l, 0, "P-)", l2));
 		}
 	}
@@ -1237,8 +1239,8 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public void reduce(
-			Iterable<Tuple5<Integer, Long, Integer, String, Long>> values,
-			Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
+				Iterable<Tuple5<Integer, Long, Integer, String, Long>> values,
+				Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
 		{
 			int i = 0;
 			long l = 0l;
@@ -1258,29 +1260,29 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			out.collect(new Tuple5<Integer, Long, Integer, String, Long>(i, l, 0, concat.toString(), l2));
 		}
 	}
-	
+
 	public static class CustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> {
 		private static final long serialVersionUID = 1L;
-		
+
 
 		@Override
 		public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
 			final Iterator<CustomType> iter = values.iterator();
-			
+
 			CustomType o = new CustomType();
 			CustomType c = iter.next();
-			
+
 			o.myString = "Hello!";
 			o.myInt = c.myInt;
 			o.myLong = c.myLong;
-			
+
 			while (iter.hasNext()) {
 				CustomType next = iter.next();
 				o.myLong += next.myLong;
 			}
-			
+
 			out.collect(o);
-			
+
 		}
 	}
 
@@ -1303,7 +1305,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				CustomType next = iter.next();
 				concat.append("-").append(next.myString);
 				o.myLong += next.myLong;
-				
+
 			}
 
 			o.myString = concat.toString();
@@ -1319,7 +1321,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 
 			for ( Tuple3<Integer, Long, String> t : values ) {
-				
+
 				if(t.f0 < 4) {
 					t.f2 = "Hi!";
 					t.f0 += 10;
@@ -1331,74 +1333,74 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			}
 		}
 	}
-	
+
 	public static class AllAddingTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
 
 			int i = 0;
 			long l = 0l;
-			
+
 			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0;
 				l += t.f1;
 			}
-			
+
 			out.collect(new Tuple3<Integer, Long, String>(i, l, "Hello World"));
 		}
 	}
-	
+
 	public static class AllAddingCustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
 
 			CustomType o = new CustomType(0, 0, "Hello!");
-			
+
 			for (CustomType next : values) {
 				o.myInt += next.myInt;
 				o.myLong += next.myLong;
 			}
-			
+
 			out.collect(o);
 		}
 	}
-	
+
 	public static class BCTuple3GroupReduce extends RichGroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private String f2Replace = "";
-		
+
 		@Override
 		public void open(Configuration config) {
-			
+
 			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
 			int sum = 0;
 			for(Integer i : ints) {
 				sum += i;
 			}
 			f2Replace = sum+"";
-			
+
 		}
 
 		@Override
 		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
-				
+
 			int i = 0;
 			long l = 0l;
-			
+
 			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0;
 				l = t.f1;
 			}
-			
+
 			out.collect(new Tuple3<Integer, Long, String>(i, l, this.f2Replace));
-			
+
 		}
 	}
-	
+
 	@RichGroupReduceFunction.Combinable
 	public static class Tuple3GroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
@@ -1469,75 +1471,75 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			out.collect(new Tuple2<Integer, String>(i, s));
 		}
 	}
-	
+
 	@RichGroupReduceFunction.Combinable
 	public static class Tuple3AllGroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
-			
+
 			Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, "");
-			
+
 			for ( Tuple3<Integer, Long, String> t : values ) {
 				o.f0 += t.f0;
 				o.f1 += t.f1;
 				o.f2 += "test";
 			}
-			
+
 			out.collect(o);
 		}
 
 		@Override
 		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
-			
+
 			int i = 0;
 			String s = "";
-			
+
 			for ( Tuple3<Integer, Long, String> t : values ) {
 				i += t.f0 + t.f1;
 				s += t.f2;
 			}
-			
+
 			out.collect(new Tuple2<Integer, String>(i, s));
-			
+
 		}
 	}
-	
+
 	@RichGroupReduceFunction.Combinable
 	public static class CustomTypeGroupReduceWithCombine extends RichGroupReduceFunction<CustomType, CustomType> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public void combine(Iterable<CustomType> values, Collector<CustomType> out) throws Exception {
-			
+
 			CustomType o = new CustomType();
-			
+
 			for ( CustomType c : values ) {
 				o.myInt = c.myInt;
 				o.myLong += c.myLong;
 				o.myString = "test"+c.myInt;
 			}
-			
+
 			out.collect(o);
 		}
 
 		@Override
 		public void reduce(Iterable<CustomType> values, Collector<CustomType> out)  {
-			
+
 			CustomType o = new CustomType(0, 0, "");
-			
+
 			for ( CustomType c : values) {
 				o.myInt = c.myInt;
 				o.myLong += c.myLong;
 				o.myString = c.myString;
 			}
-			
+
 			out.collect(o);
-			
+
 		}
 	}
-	
+
 	@RichGroupReduceFunction.Combinable
 	public static class OrderCheckingCombinableReduce extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
@@ -1546,28 +1548,28 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
 			Iterator<Tuple3<Integer,Long,String>> it = values.iterator();
 			Tuple3<Integer,Long,String> t = it.next();
-			
+
 			int i = t.f0;
 			out.collect(t);
-			
+
 			while(it.hasNext()) {
 				t = it.next();
 				if(i > t.f0 || t.f2.equals("INVALID-ORDER!")) {
 					t.f2 = "INVALID-ORDER!";
 					out.collect(t);
 				}
-			}		
+			}
 		}
-		
+
 		@Override
-		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {	
-			
+		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
+
 			Iterator<Tuple3<Integer,Long,String>> it = values.iterator();
 			Tuple3<Integer,Long,String> t = it.next();
-			
+
 			int i = t.f0;
 			out.collect(t);
-			
+
 			while(it.hasNext()) {
 				t = it.next();
 				if(i > t.f0) {
@@ -1577,15 +1579,15 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			}
 
 		}
-		
-		
+
+
 	}
-	
+
 	public static final class IdentityMapper<T> extends RichMapFunction<T, T> {
 		@Override
 		public T map(T value) { return value; }
 	}
-	
+
 	private static int countElements(Iterable<?> iterable) {
 		int c = 0;
 		for (@SuppressWarnings("unused") Object o : iterable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index fe436a3..61e07fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -35,11 +36,7 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -53,22 +50,6 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testUDFJoinOnTuplesWithKeyFieldPositions() throws Exception {
 		/*
@@ -81,16 +62,17 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.join(ds2)
-						.where(1)
-						.equalTo(1)
-						.with(new T3T5FlatJoin());
+				.where(1)
+				.equalTo(1)
+				.with(new T3T5FlatJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hallo\n" +
+		String expected = "Hi,Hallo\n" +
 				"Hello,Hallo Welt\n" +
 				"Hello world,Hallo Welt\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -105,19 +87,20 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.join(ds2)
-						.where(0,1)
-						.equalTo(0,4)
-						.with(new T3T5FlatJoin());
+				.where(0,1)
+				.equalTo(0,4)
+				.with(new T3T5FlatJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hallo\n" +
+		String expected = "Hi,Hallo\n" +
 				"Hello,Hallo Welt\n" +
 				"Hello world,Hallo Welt wie gehts?\n" +
 				"Hello world,ABC\n" +
 				"I am fine.,HIJ\n" +
 				"I am fine.,IJK\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -132,16 +115,17 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<Tuple3<Integer, Long, String>,Tuple5<Integer, Long, Integer, String, Long>>> joinDs =
 				ds1.join(ds2)
-						.where(0)
-						.equalTo(2);
+				.where(0)
+				.equalTo(2);
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> result = joinDs.collect();
 
-		expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+		String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
 				"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
 				"(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n";
 
+		compareResultAsTuples(result, expected);
+
 	}
 
 	@Test
@@ -159,12 +143,13 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				.equalTo(1)
 				.with(new T3T5FlatJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hallo\n" +
+		String expected = "Hi,Hallo\n" +
 				"Hello,Hallo Welt\n" +
 				"Hello world,Hallo Welt\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -179,16 +164,17 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.joinWithTiny(ds2)
-						.where(1)
-						.equalTo(1)
-						.with(new T3T5FlatJoin());
+				.where(1)
+				.equalTo(1)
+				.with(new T3T5FlatJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hallo\n" +
+		String expected = "Hi,Hallo\n" +
 				"Hello,Hallo Welt\n" +
 				"Hello world,Hallo Welt\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -203,16 +189,17 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> joinDs =
 				ds1.join(ds2)
-						.where(1)
-						.equalTo(1)
-						.with(new LeftReturningJoin());
+				.where(1)
+				.equalTo(1)
+				.with(new LeftReturningJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = joinDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"2,2,Hello\n" +
 				"3,2,Hello world\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -227,16 +214,17 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> joinDs =
 				ds1.join(ds2)
-						.where(1)
-						.equalTo(1)
-						.with(new RightReturningJoin());
+				.where(1)
+				.equalTo(1)
+				.with(new RightReturningJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = joinDs.collect();
 
-		expected = "1,1,0,Hallo,1\n" +
+		String expected = "1,1,0,Hallo,1\n" +
 				"2,2,1,Hallo Welt,2\n" +
 				"2,2,1,Hallo Welt,2\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -253,18 +241,19 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
 		DataSet<Tuple3<String, String, Integer>> joinDs =
 				ds1.join(ds2)
-						.where(1)
-						.equalTo(4)
-						.with(new T3T5BCJoin())
-						.withBroadcastSet(intDs, "ints");
+				.where(1)
+				.equalTo(4)
+				.with(new T3T5BCJoin())
+				.withBroadcastSet(intDs, "ints");
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<String, String, Integer>> result = joinDs.collect();
 
-		expected = "Hi,Hallo,55\n" +
+		String expected = "Hi,Hallo,55\n" +
 				"Hi,Hallo Welt wie,55\n" +
 				"Hello,Hallo Welt,55\n" +
 				"Hello world,Hallo Welt,55\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -280,17 +269,18 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.join(ds2)
-						.where(new KeySelector1())
-						.equalTo(0)
-						.with(new CustT3Join());
+				.where(new KeySelector1())
+				.equalTo(0)
+				.with(new CustT3Join());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hi\n" +
+		String expected = "Hi,Hi\n" +
 				"Hello,Hello\n" +
 				"Hello world,Hello\n";
 
+		compareResultAsTuples(result, expected);
+
 	}
 
 	public static class KeySelector1 implements KeySelector<CustomType, Integer> {
@@ -312,19 +302,20 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple6<String, Long, String, Integer, Long, Long>> joinDs =
 				ds1.join(ds2)
-						.where(1)
-						.equalTo(1)
-						.projectFirst(2,1)
-						.projectSecond(3)
-						.projectFirst(0)
-						.projectSecond(4,1);
+				.where(1)
+				.equalTo(1)
+				.projectFirst(2,1)
+				.projectSecond(3)
+				.projectFirst(0)
+				.projectSecond(4,1);
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple6<String, Long, String, Integer, Long, Long>> result = joinDs.collect();
 
-		expected = "Hi,1,Hallo,1,1,1\n" +
+		String expected = "Hi,1,Hallo,1,1,1\n" +
 				"Hello,2,Hallo Welt,2,2,2\n" +
 				"Hello world,2,Hallo Welt,3,2,2\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -339,19 +330,20 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple6<String, String, Long, Long, Long, Integer>> joinDs =
 				ds1.join(ds2)
-						.where(1)
-						.equalTo(1)
-						.projectSecond(3)
-						.projectFirst(2,1)
-						.projectSecond(4,1)
-						.projectFirst(0);
+				.where(1)
+				.equalTo(1)
+				.projectSecond(3)
+				.projectFirst(2,1)
+				.projectSecond(4,1)
+				.projectFirst(0);
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple6<String, String, Long, Long, Long, Integer>> result = joinDs.collect();
 
-		expected = "Hallo,Hi,1,1,1,1\n" +
+		String expected = "Hallo,Hi,1,1,1,1\n" +
 				"Hallo Welt,Hello,2,2,2,2\n" +
 				"Hallo Welt,Hello world,2,2,2,3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -367,15 +359,16 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.join(ds2)
-						.where(1).equalTo(new KeySelector2())
-						.with(new T3CustJoin());
+				.where(1).equalTo(new KeySelector2())
+				.with(new T3CustJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hello\n" +
+		String expected = "Hi,Hello\n" +
 				"Hello,Hello world\n" +
 				"Hello world,Hello world\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector2 implements KeySelector<CustomType, Long> {
@@ -398,21 +391,18 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple2<CustomType, CustomType>> joinDs =
 				ds1.join(ds2)
-						.where(
-							new KeySelector5()
-						)
-						.equalTo(
-								new KeySelector6()
-						);
+				.where(new KeySelector5())
+				.equalTo(new KeySelector6());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<CustomType, CustomType>> result = joinDs.collect();
 
-		expected = "1,0,Hi,1,0,Hi\n" +
+		String expected = "1,0,Hi,1,0,Hi\n" +
 				"2,1,Hello,2,1,Hello\n" +
 				"2,1,Hello,2,2,Hello world\n" +
 				"2,2,Hello world,2,1,Hello\n" +
 				"2,2,Hello world,2,2,Hello world\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector5 implements KeySelector<CustomType, Integer> {
@@ -441,19 +431,20 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.join(ds2)
-						.where(new KeySelector3())
-						.equalTo(new KeySelector4())
-						.with(new T3T5FlatJoin());
+				.where(new KeySelector3())
+				.equalTo(new KeySelector4())
+				.with(new T3T5FlatJoin());
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<String, String>> result = joinDs.collect();
 
-		expected = "Hi,Hallo\n" +
+		String expected = "Hi,Hallo\n" +
 				"Hello,Hallo Welt\n" +
 				"Hello world,Hallo Welt wie gehts?\n" +
 				"Hello world,ABC\n" +
 				"I am fine.,HIJ\n" +
 				"I am fine.,IJK\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector3 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
@@ -486,12 +477,13 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
 				ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
 
-		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
 				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
 				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -506,12 +498,13 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
 				ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
 
-		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
 				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
 				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -526,13 +519,14 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
 				ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
 
-		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
 				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
 				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -547,13 +541,14 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
 				ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
 
-		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
 				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
 				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -568,13 +563,14 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
 				ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
 
-		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
 				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
 				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -589,14 +585,15 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
 				ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect();
 
-		expected = "((1,1),one),((1,1),one)\n" +
+		String expected = "((1,1),one),((1,1),one)\n" +
 				"((2,2),two),((2,2),two)\n" +
 				"((3,3),three),((3,3),three)\n";
 
+		compareResultAsTuples(result, expected);
+
 	}
 
 	@Test
@@ -611,13 +608,14 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
 				ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect();
 
-		expected = "((1,1),one),((1,1),one)\n" +
+		String expected = "((1,1),one),((1,1),one)\n" +
 				"((2,2),two),((2,2),two)\n" +
 				"((3,3),three),((3,3),three)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -632,18 +630,19 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String> >> joinDs =
 				ds1.join(ds2).where("*").equalTo("*");
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>> result = joinDs.collect();
 
-		expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
+		String expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" +
 				"2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
 				"3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
 	public void testNonPojoToVerifyNestedTupleElementSelectionWithFirstKeyFieldGreaterThanZero()
-	throws Exception {
+			throws Exception {
 		/*
 		 * Non-POJO test to verify "nested" tuple-element selection with the first key field greater than 0.
 		 */
@@ -654,13 +653,14 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> joinDs =
 				ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
 
-		joinDs.writeAsCsv(resultPath);
 		env.setParallelism(1);
-		env.execute();
+		List<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> result = joinDs.collect();
 
-		expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
+		String expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
 				"((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n" +
 				"((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -672,11 +672,12 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple2<Tuple3<Integer, Long, String>, Integer>> joinDs = ds1.join(ds2).where(0).equalTo("*");
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Tuple3<Integer, Long, String>, Integer>> result = joinDs.collect();
 
-		expected = "(1,1,Hi),1\n" +
-			"(2,2,Hello),2";
+		String expected = "(1,1,Hi),1\n" +
+				"(2,2,Hello),2";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public void testJoinWithAtomicType2() throws Exception {
@@ -687,11 +688,12 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple2<Integer, Tuple3<Integer, Long, String>>> joinDs = ds1.join(ds2).where("*").equalTo(0);
 
-		joinDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, Tuple3<Integer, Long, String>>> result = joinDs.collect();
+
+		String expected = "1,(1,1,Hi)\n" +
+				"2,(2,2,Hello)";
 
-		expected = "1,(1,1,Hi)\n" +
-			"2,(2,2,Hello)";
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
@@ -705,41 +707,41 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		}
 
 	}
-	
+
 	public static class LeftReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
 
 		@Override
 		public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, String> first,
-												  Tuple5<Integer, Long, Integer, String, Long> second) {
-			
+				Tuple5<Integer, Long, Integer, String, Long> second) {
+
 			return first;
 		}
 	}
-	
+
 	public static class RightReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 
 		@Override
 		public Tuple5<Integer, Long, Integer, String, Long> join(Tuple3<Integer, Long, String> first,
-																 Tuple5<Integer, Long, Integer, String, Long> second) {
-			
+				Tuple5<Integer, Long, Integer, String, Long> second) {
+
 			return second;
 		}
 	}
-		
+
 	public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
 
 		private int broadcast;
-		
+
 		@Override
 		public void open(Configuration config) {
-			
+
 			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
 			int sum = 0;
 			for(Integer i : ints) {
 				sum += i;
 			}
 			broadcast = sum;
-			
+
 		}
 
 		/*
@@ -750,24 +752,24 @@ public class JoinITCase extends MultipleProgramsTestBase {
 
 			return new Tuple3<String, String, Integer>(first.f2, second.f3, broadcast);
 		}
-		*/
+		 */
 
 		@Override
 		public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple3<String, String, Integer>> out) throws Exception {
 			out.collect(new Tuple3<String, String, Integer> (first.f2, second.f3, broadcast));
 		}
 	}
-	
+
 	public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> {
 
 		@Override
 		public Tuple2<String, String> join(Tuple3<Integer, Long, String> first,
-										   CustomType second) {
+				CustomType second) {
 
 			return new Tuple2<String, String>(first.f2, second.myString);
 		}
 	}
-	
+
 	public static class CustT3Join implements JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index 083f0a3..e5a494b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -19,21 +19,17 @@
 package org.apache.flink.test.javaApiOperators;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
 import org.junit.Assert;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
@@ -46,22 +42,6 @@ public class MapITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testIdentityMapWithBasicType() throws Exception {
 		/*
@@ -74,10 +54,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<String> identityMapDs = ds.
 				map(new Mapper1());
 
-		identityMapDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = identityMapDs.collect();
 
-		expected = "Hi\n" +
+		String expected = "Hi\n" +
 				"Hello\n" +
 				"Hello world\n" +
 				"Hello world, how are you?\n" +
@@ -85,6 +64,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"Luke Skywalker\n" +
 				"Random comment\n" +
 				"LOL\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class Mapper1 implements MapFunction<String, String> {
@@ -108,10 +89,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
 				map(new Mapper2());
 
-		identityMapDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"2,2,Hello\n" +
 				"3,2,Hello world\n" +
 				"4,3,Hello world, how are you?\n" +
@@ -132,6 +112,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"19,6,Comment#13\n" +
 				"20,6,Comment#14\n" +
 				"21,6,Comment#15\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class Mapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
@@ -139,7 +121,7 @@ public class MapITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
-		throws Exception {
+				throws Exception {
 			return value;
 		}
 	}
@@ -156,10 +138,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
 				map(new Mapper3());
 
-		typeConversionMapDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = typeConversionMapDs.collect();
 
-		expected = "1,0,Hi\n" +
+		String expected = "1,0,Hi\n" +
 				"2,1,Hello\n" +
 				"2,2,Hello world\n" +
 				"3,3,Hello world, how are you?\n" +
@@ -180,6 +161,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"6,18,Comment#13\n" +
 				"6,19,Comment#14\n" +
 				"6,20,Comment#15\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class Mapper3 implements MapFunction<CustomType, Tuple3<Integer, Long, String>> {
@@ -207,10 +190,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<String> typeConversionMapDs = ds.
 				map(new Mapper4());
 
-		typeConversionMapDs.writeAsText(resultPath);
-		env.execute();
+		List<String> result = typeConversionMapDs.collect();
 
-		expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+		String expected = "Hi\n" + "Hello\n" + "Hello world\n" +
 				"Hello world, how are you?\n" +
 				"I am fine.\n" + "Luke Skywalker\n" +
 				"Comment#1\n" +	"Comment#2\n" +
@@ -221,6 +203,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"Comment#11\n" + "Comment#12\n" +
 				"Comment#13\n" + "Comment#14\n" +
 				"Comment#15\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class Mapper4 implements MapFunction<Tuple3<Integer, Long, String>, String> {
@@ -234,7 +218,7 @@ public class MapITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testMapperOnTupleIncrementIntegerFieldReorderSecondAndThirdFields() throws
-			Exception {
+	Exception {
 		/*
 		 * Test mapper on tuple - Increment Integer field, reorder second and third fields
 		 */
@@ -245,10 +229,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
 				map(new Mapper5());
 
-		tupleMapDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, String, Long>> result = tupleMapDs.collect();
 
-		expected = "2,Hi,1\n" +
+		String expected = "2,Hi,1\n" +
 				"3,Hello,2\n" +
 				"4,Hello world,2\n" +
 				"5,Hello world, how are you?,3\n" +
@@ -269,6 +252,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"20,Comment#13,6\n" +
 				"21,Comment#14,6\n" +
 				"22,Comment#15,6\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class Mapper5 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>> {
@@ -277,7 +262,7 @@ public class MapITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value)
-		throws Exception {
+				throws Exception {
 			Integer incr = Integer.valueOf(value.f0.intValue() + 1);
 			out.setFields(incr, value.f2, value.f1);
 			return out;
@@ -296,10 +281,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> customMapDs = ds.
 				map(new Mapper6());
 
-		customMapDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = customMapDs.collect();
 
-		expected = "1,0,hi\n" +
+		String expected = "1,0,hi\n" +
 				"2,1,hello\n" +
 				"2,2,hello world\n" +
 				"3,3,hello world, how are you?\n" +
@@ -320,6 +304,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"6,18,comment#13\n" +
 				"6,19,comment#14\n" +
 				"6,20,comment#15\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class Mapper6 implements MapFunction<CustomType, CustomType> {
@@ -347,10 +333,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
 				map(new Mapper7());
 
-		inputObjMapDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = inputObjMapDs.collect();
 
-		expected = "2,1,Hi\n" +
+		String expected = "2,1,Hi\n" +
 				"3,2,Hello\n" +
 				"4,2,Hello world\n" +
 				"5,3,Hello world, how are you?\n" +
@@ -371,6 +356,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"20,6,Comment#13\n" +
 				"21,6,Comment#14\n" +
 				"22,6,Comment#15\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class Mapper7 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
@@ -378,7 +365,7 @@ public class MapITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
-		throws Exception {
+				throws Exception {
 			Integer incr = Integer.valueOf(value.f0.intValue() + 1);
 			value.setField(incr, 0);
 			return value;
@@ -398,10 +385,9 @@ public class MapITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
 				map(new RichMapper1()).withBroadcastSet(ints, "ints");
-		bcMapDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
 
-		expected = "55,1,Hi\n" +
+		String expected = "55,1,Hi\n" +
 				"55,2,Hello\n" +
 				"55,2,Hello world\n" +
 				"55,3,Hello world, how are you?\n" +
@@ -422,6 +408,8 @@ public class MapITCase extends MultipleProgramsTestBase {
 				"55,6,Comment#13\n" +
 				"55,6,Comment#14\n" +
 				"55,6,Comment#15\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class RichMapper1 extends RichMapFunction<Tuple3<Integer,Long,String>,
@@ -442,7 +430,7 @@ public class MapITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
-		throws Exception {
+				throws Exception {
 			out.setFields(f2Replace, value.f1, value.f2);
 			return out;
 		}
@@ -464,12 +452,13 @@ public class MapITCase extends MultipleProgramsTestBase {
 		conf.setInteger(testKey, testValue);
 		DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
 				map(new RichMapper2()).withParameters(conf);
-		bcMapDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
 
-		expected = "1,1,Hi\n"
+		String expected = "1,1,Hi\n"
 				+ "2,2,Hello\n"
 				+ "3,2,Hello world";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class RichMapper2 extends RichMapFunction<Tuple3<Integer,Long,String>,

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index 3637680..c231eac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.javaApiOperators;
 
 import java.util.HashSet;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -34,11 +35,7 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -49,22 +46,6 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
 	@Test
 	public void testHashPartitionByKeyField() throws Exception {
 		/*
@@ -77,15 +58,16 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		DataSet<Long> uniqLongs = ds
 				.partitionByHash(1)
 				.mapPartition(new UniqueLongMapper());
-		uniqLongs.writeAsText(resultPath);
-		env.execute();
+		List<Long> result = uniqLongs.collect();
 
-		expected = "1\n" +
+		String expected = "1\n" +
 				"2\n" +
 				"3\n" +
 				"4\n" +
 				"5\n" +
 				"6\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -100,15 +82,16 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		DataSet<Long> uniqLongs = ds
 				.partitionByHash(new KeySelector1())
 				.mapPartition(new UniqueLongMapper());
-		uniqLongs.writeAsText(resultPath);
-		env.execute();
+		List<Long> result = uniqLongs.collect();
 
-		expected = 	"1\n" +
+		String expected = "1\n" +
 				"2\n" +
 				"3\n" +
 				"4\n" +
 				"5\n" +
 				"6\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
@@ -134,26 +117,25 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
 				// introduce some partition skew by filtering
 				.filter(new Filter1())
-						// rebalance
+				// rebalance
 				.rebalance()
-						// count values in each partition
+				// count values in each partition
 				.map(new PartitionIndexMapper())
 				.groupBy(0)
 				.reduce(new Reducer1())
-						// round counts to mitigate runtime scheduling effects (lazy split assignment)
+				// round counts to mitigate runtime scheduling effects (lazy split assignment)
 				.map(new Mapper1());
 
-		uniqLongs.writeAsText(resultPath);
+		List<Tuple2<Integer, Integer>> result = uniqLongs.collect();
 
-		env.execute();
-
-		StringBuilder result = new StringBuilder();
+		StringBuilder expected = new StringBuilder();
 		int numPerPartition = 2220 / env.getParallelism() / 10;
 		for (int i = 0; i < env.getParallelism(); i++) {
-			result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
+			expected.append('(').append(i).append(',')
+			.append(numPerPartition).append(")\n");
 		}
 
-		expected = result.toString();
+		compareResultAsText(result, expected.toString());
 	}
 
 	public static class Filter1 implements FilterFunction<Long> {
@@ -172,13 +154,14 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 	public static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
+		@Override
 		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
 			return new Tuple2<Integer, Integer>(v1.f0, v1.f1+v2.f1);
 		}
 	}
 
 	public static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
-			Integer>>{
+	Integer>>{
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -202,16 +185,16 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		DataSet<Long> uniqLongs = ds
 				.partitionByHash(1).setParallelism(4)
 				.mapPartition(new UniqueLongMapper());
-		uniqLongs.writeAsText(resultPath);
+		List<Long> result = uniqLongs.collect();
 
-		env.execute();
-
-		expected = 	"1\n" +
+		String expected = "1\n" +
 				"2\n" +
 				"3\n" +
 				"4\n" +
 				"5\n" +
 				"6\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -227,13 +210,13 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		DataSet<Long> uniqLongs = ds
 				.partitionByHash("nestedPojo.longNumber").setParallelism(4)
 				.mapPartition(new UniqueNestedPojoLongMapper());
-		uniqLongs.writeAsText(resultPath);
+		List<Long> result = uniqLongs.collect();
 
-		env.execute();
-
-		expected = 	"10000\n" +
+		String expected = "10000\n" +
 				"20000\n" +
 				"30000\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class UniqueLongMapper implements MapPartitionFunction<Tuple3<Integer,Long,String>, Long> {
@@ -250,7 +233,7 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 			}
 		}
 	}
-	
+
 	public static class UniqueNestedPojoLongMapper implements MapPartitionFunction<POJO, Long> {
 		private static final long serialVersionUID = 1L;
 
@@ -265,7 +248,7 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 			}
 		}
 	}
-	
+
 	public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
index aa75836..1054c62 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.List;
+
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -27,14 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 
 public class ProjectITCase extends JavaProgramTestBase {
 
-	private String resultPath;
-	private String expectedResult;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
 	@Override
 	protected void testProgram() throws Exception {
 		/*
@@ -45,11 +39,10 @@ public class ProjectITCase extends JavaProgramTestBase {
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple3<String, Long, Integer>> projDs = ds.
-						project(3,4,2);
-		projDs.writeAsCsv(resultPath);
+				project(3,4,2);
+		List<Tuple3<String, Long, Integer>> result = projDs.collect();
 
-		env.execute();
-		expectedResult = "Hallo,1,0\n" +
+		String expectedResult = "Hallo,1,0\n" +
 				"Hallo Welt,2,1\n" +
 				"Hallo Welt wie,1,2\n" +
 				"Hallo Welt wie gehts?,2,3\n" +
@@ -64,10 +57,8 @@ public class ProjectITCase extends JavaProgramTestBase {
 				"IJK,3,12\n" +
 				"JKL,2,13\n" +
 				"KLM,2,14\n";
+
+		compareResultAsTuples(result, expectedResult);
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
+
 }


Mime
View raw message