flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/10] flink git commit: [tests] Adapted CoGroupITCase to use "collect()" rather than writing temp files (increases robustness)
Date Tue, 12 May 2015 21:03:20 GMT
[tests] Adapted CoGroupITCase to use "collect()" rather than writing temp files (increases
robustness)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e79ff4eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e79ff4eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e79ff4eb

Branch: refs/heads/master
Commit: e79ff4ebfc896c827ee5cfde57570ff42fb04e10
Parents: 4bd3525
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue May 12 18:47:57 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue May 12 21:35:58 2015 +0200

----------------------------------------------------------------------
 .../test/javaApiOperators/CoGroupITCase.java    | 204 ++++++++++++-------
 1 file changed, 130 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e79ff4eb/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 84c05d6..115ab0d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -34,19 +35,19 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.After;
+
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 @RunWith(Parameterized.class)
 public class CoGroupITCase extends MultipleProgramsTestBase {
 
@@ -54,27 +55,12 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
+	/*
+	 * CoGroup on tuples with key field selector
+	 */
 	@Test
 	public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
-		/*
-				 * CoGroup on tuples with key field selector
-				 */
+		
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -82,14 +68,15 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new
Tuple5CoGroup());
 
-		coGroupDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple2<Integer, Integer>> result = coGroupDs.collect();
 
-		expected = "1,0\n" +
+		String expected = "1,0\n" +
 				"2,6\n" +
 				"3,24\n" +
 				"4,60\n" +
 				"5,120\n";
+		
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -105,15 +92,16 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
 				KeySelector5()).with(new CustomTypeCoGroup());
 
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = coGroupDs.collect();
 
-		expected = "1,0,test\n" +
+		String expected = "1,0,test\n" +
 				"2,6,test\n" +
 				"3,24,test\n" +
 				"4,60,test\n" +
 				"5,120,test\n" +
 				"6,210,test\n";
+		
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector4 implements KeySelector<CustomType, Integer> {
@@ -144,14 +132,15 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new
Tuple3ReturnLeft());
 
-		coGroupDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
 
-		expected = "1,1,Hi\n" +
+		String expected = "1,1,Hi\n" +
 				"2,2,Hello\n" +
 				"3,2,Hello world\n" +
 				"4,3,Hello world, how are you?\n" +
 				"5,3,I am fine.\n";
+		
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -165,16 +154,17 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new
Tuple5ReturnRight());
+		
+		List<Tuple5<Integer, Long, Integer, String, Long>> result = coGroupDs.collect();
 
-		coGroupDs.writeAsCsv(resultPath);
-		env.execute();
-
-		expected = "1,1,0,Hallo,1\n" +
+		String expected = "1,1,0,Hallo,1\n" +
 				"2,2,1,Hallo Welt,2\n" +
 				"2,3,2,Hallo Welt wie,1\n" +
 				"3,4,3,Hallo Welt wie gehts?,2\n" +
 				"3,5,4,ABC,2\n" +
 				"3,6,5,BCD,3\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -191,14 +181,15 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new
Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
 
-		coGroupDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Integer, Integer>> result = coGroupDs.collect();
 
-		expected = "1,0,55\n" +
+		String expected = "1,0,55\n" +
 				"2,6,55\n" +
 				"3,24,55\n" +
 				"4,60,55\n" +
 				"5,120,55\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -215,10 +206,9 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new
 				KeySelector2()).with(new MixedCoGroup());
 
-		coGroupDs.writeAsCsv(resultPath);
-		env.execute();
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
 
-		expected = "0,1,test\n" +
+		String expected = "0,1,test\n" +
 				"1,2,test\n" +
 				"2,5,test\n" +
 				"3,15,test\n" +
@@ -233,6 +223,8 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 				"12,5,test\n" +
 				"13,5,test\n" +
 				"14,5,test\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector2 implements KeySelector<CustomType, Integer> {
@@ -257,10 +249,9 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with
 				(new MixedCoGroup2());
 
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
+		List<CustomType> result = coGroupDs.collect();
 
-		expected = "0,1,test\n" +
+		String expected = "0,1,test\n" +
 				"1,2,test\n" +
 				"2,5,test\n" +
 				"3,15,test\n" +
@@ -276,6 +267,7 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 				"13,5,test\n" +
 				"14,5,test\n";
 
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector3 implements KeySelector<CustomType, Integer> {
@@ -300,15 +292,16 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
 				where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
 
-		coGrouped.writeAsCsv(resultPath);
-		env.execute();
-
-		expected = "1,1,Hallo\n" +
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+		
+		String expected = "1,1,Hallo\n" +
 				"2,2,Hallo Welt\n" +
 				"3,2,Hallo Welt wie gehts?\n" +
 				"3,2,ABC\n" +
 				"5,3,HIJ\n" +
 				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	@Test
@@ -326,15 +319,16 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 				where(new KeySelector7()).
 				equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup());
 
-		coGrouped.writeAsCsv(resultPath);
-		env.execute();
-
-		expected = "1,1,Hallo\n" +
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+		
+		String expected = "1,1,Hallo\n" +
 				"2,2,Hallo Welt\n" +
 				"3,2,Hallo Welt wie gehts?\n" +
 				"3,2,ABC\n" +
 				"5,3,HIJ\n" +
 				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
 	}
 
 	public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
@@ -368,15 +362,16 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
 		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new
CustomTypeCoGroup());
 
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
-
-		expected = "1,0,test\n" +
+		List<CustomType> result = coGroupDs.collect();
+		
+		String expected = "1,0,test\n" +
 				"2,6,test\n" +
 				"3,24,test\n" +
 				"4,60,test\n" +
 				"5,120,test\n" +
 				"6,210,test\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -392,12 +387,14 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
 				.where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
 
-		expected = 	"-1,20000,Flink\n" +
+		List<CustomType> result = coGroupDs.collect();
+		
+		String expected = 	"-1,20000,Flink\n" +
 				"-1,10000,Flink\n" +
 				"-1,30000,Flink\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	public static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String,
Integer, Integer, Long, String, Long>, CustomType> {
@@ -430,13 +427,14 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
 				.where(new KeySelector6()).equalTo(6).with(new CoGroup3());
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
 
-		expected = 	"-1,20000,Flink\n" +
+		List<CustomType> result = coGroupDs.collect();
+		
+		String expected = 	"-1,20000,Flink\n" +
 				"-1,10000,Flink\n" +
 				"-1,30000,Flink\n";
 
+		compareResultAsText(result, expected);
 	}
 
 	public static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>>
{
@@ -480,12 +478,14 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
 				.where(new KeySelector1()).equalTo(6).with(new CoGroup2());
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
 
-		expected = "-1,20000,Flink\n" +
+		List<CustomType> result = coGroupDs.collect();
+		
+		String expected = "-1,20000,Flink\n" +
 				"-1,10000,Flink\n" +
 				"-1,30000,Flink\n";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -496,11 +496,12 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new
CoGroupAtomic1());
 
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
-
-		expected = "(1,1,Hi)\n" +
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+		
+		String expected = "(1,1,Hi)\n" +
 			"(2,2,Hello)";
+
+		compareResultAsText(result, expected);
 	}
 
 	@Test
@@ -510,14 +511,69 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
 
 		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new
CoGroupAtomic2());
+		
+		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+		
+		String expected = "(1,1,Hi)\n" +
+			"(2,2,Hello)";
+
+		compareResultAsText(result, expected);
+	}
 
-		coGroupDs.writeAsText(resultPath);
-		env.execute();
 
-		expected = "(1,1,Hi)\n" +
-			"(2,2,Hello)";
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+
+	private <T> void compareResultAsTuples(List<T> result, String expected) {
+		compareResult(result, expected, true);
+	}
+
+	private <T> void compareResultAsText(List<T> result, String expected) {
+		compareResult(result, expected, false);
 	}
+	
+	private <T> void compareResult(List<T> result, String expected, boolean asTuples)
{
+		String[] extectedStrings = expected.split("\n");
+		String[] resultStrings = new String[result.size()];
+		
+		for (int i = 0; i < resultStrings.length; i++) {
+			T val = result.get(i);
+			
+			if (asTuples) {
+				if (val instanceof Tuple) {
+					Tuple t = (Tuple) val;
+					Object first = t.getField(0);
+					StringBuilder bld = new StringBuilder(first == null ? "null" : first.toString());
+					for (int pos = 1; pos < t.getArity(); pos++) {
+						Object next = t.getField(pos);
+						bld.append(',').append(next == null ? "null" : next.toString());
+					}
+					resultStrings[i] = bld.toString();
+				}
+				else {
+					throw new IllegalArgumentException(val + " is no tuple");
+				}
+			}
+			else {
+				resultStrings[i] = (val == null) ? "null" : val.toString();
+			}
+		}
+		
+		assertEquals("Wrong number of elements result", extectedStrings.length, resultStrings.length);
 
+		Arrays.sort(extectedStrings);
+		Arrays.sort(resultStrings);
+		
+		for (int i = 0; i < extectedStrings.length; i++) {
+			assertEquals(extectedStrings[i], resultStrings[i]);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  UDF classes
+	// --------------------------------------------------------------------------------------------
+	
 	public static class KeySelector1 implements KeySelector<POJO, Long> {
 		private static final long serialVersionUID = 1L;
 


Mime
View raw message