flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/7] flink git commit: [hotfix] [tests] Clean up and simplify ObjectReuseITCase
Date Sun, 27 Dec 2015 12:13:40 GMT
[hotfix] [tests] Clean up and simplify ObjectReuseITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac1e19e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac1e19e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac1e19e

Branch: refs/heads/master
Commit: 2ac1e19e0d1a5da1a6e3c990ad676af836d4f1c0
Parents: c246ff2
Author: Stephan Ewen <sewen@apache.org>
Authored: Sat Dec 26 19:47:18 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Dec 26 19:47:18 2015 +0100

----------------------------------------------------------------------
 .../test/util/MultipleProgramsTestBase.java     |  11 +-
 .../javaApiOperators/ObjectReuseITCase.java     | 321 +++++++++----------
 .../flink/test/web/WebFrontendITCase.java       |   9 +-
 3 files changed, 154 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ac1e19e/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 38116e2..d7f09bd 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -79,7 +79,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	protected final TestExecutionMode mode;
 
 	
-	public MultipleProgramsTestBase(TestExecutionMode mode){
+	public MultipleProgramsTestBase(TestExecutionMode mode) {
 		this.mode = mode;
 		
 		switch(mode){
@@ -99,7 +99,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	// ------------------------------------------------------------------------
 
 	@BeforeClass
-	public static void setup() throws Exception{
+	public static void setup() throws Exception {
 		cluster = TestBaseUtils.startCluster(
 			1,
 			DEFAULT_PARALLELISM,
@@ -118,8 +118,9 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	// ------------------------------------------------------------------------
 	
 	@Parameterized.Parameters(name = "Execution mode = {0}")
-	public static Collection<TestExecutionMode[]> executionModes(){
-		return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},
-				new TestExecutionMode[]{TestExecutionMode.COLLECTION});
+	public static Collection<Object[]> executionModes() {
+		return Arrays.asList(
+				new Object[] { TestExecutionMode.CLUSTER },
+				new Object[] { TestExecutionMode.COLLECTION });
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac1e19e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
index 2ea4823..63abe63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
@@ -23,227 +23,194 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.*;
+
 /**
  * These check whether the object-reuse execution mode does really reuse objects.
  */
+@SuppressWarnings("serial" )
 @RunWith(Parameterized.class)
-public class ObjectReuseITCase extends JavaProgramTestBase {
-
-	private static int NUM_PROGRAMS = 4;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-
-	private static String inReducePath;
-	private static String inGroupReducePath;
-
-	private String IN_REDUCE = "a,1\na,2\na,3\na,4\na,50\n";
-	private String IN_GROUP_REDUCE = "a,1\na,2\na,3\na,4\na,5\n";
-
-	public ObjectReuseITCase(Configuration config) {
-		super(config);
-	}
+public class ObjectReuseITCase extends MultipleProgramsTestBase {
+
+	private static final List<Tuple2<String, Integer>> REDUCE_DATA =
+		Arrays.asList(
+			new Tuple2<>("a", 1), new Tuple2<>("a", 2),
+			new Tuple2<>("a", 3), new Tuple2<>("a", 4),
+			new Tuple2<>("a", 50));
+
+	private static final List<Tuple2<String, Integer>> GROUP_REDUCE_DATA =
+		Arrays.asList(
+			new Tuple2<>("a", 1), new Tuple2<>("a", 2),
+			new Tuple2<>("a", 3), new Tuple2<>("a", 4),
+			new Tuple2<>("a", 5));
 	
-	@Override
-	protected void preSubmit() throws Exception {
-		inReducePath = createTempFile("in_reduce.txt", IN_REDUCE);
-		inGroupReducePath = createTempFile("in_group_reduce.txt", IN_GROUP_REDUCE);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = Progs.runProgram(curProgId, resultPath);
-	}
 	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Override
-	protected boolean skipCollectionExecution() {
-		return true;
-	}
-
+	private final boolean objectReuse;
 	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException,
IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	public ObjectReuseITCase(boolean objectReuse) {
+		super(TestExecutionMode.CLUSTER);
+		this.objectReuse = objectReuse;
+	}
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+	@Test
+	public void testKeyedReduce() throws Exception {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (objectReuse) {
+			env.getConfig().enableObjectReuse();
+		} else {
+			env.getConfig().disableObjectReuse();
 		}
+
+		DataSet<Tuple2<String, Integer>> input = env.fromCollection(REDUCE_DATA);
 		
-		return toParameterList(tConfigs);
+		DataSet<Tuple2<String, Integer>> result = input
+			.groupBy(0)
+			.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+
+				@Override
+				public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String,
Integer> value2) {
+					value2.f1 += value1.f1;
+					return value2;
+				}
+			});
+
+		Tuple2<String, Integer> res = result.collect().get(0);
+		assertEquals(new Tuple2<>("a", 60), res);
 	}
-	
-	@SuppressWarnings({"unchecked", "serial"})
-	private static class Progs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
 
-			case 1: {
-				// Grouped reduce
+	@Test
+	public void testGlobalReduce() throws Exception {
 
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (objectReuse) {
+			env.getConfig().enableObjectReuse();
+		} else {
+			env.getConfig().disableObjectReuse();
+		}
 
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class,
Integer.class).setParallelism(1);
+		DataSet<Tuple2<String, Integer>> input = env.fromCollection(REDUCE_DATA);
 
-				DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String,
Integer>>() {
+		DataSet<Tuple2<String, Integer>> result = input.reduce(
+			new ReduceFunction<Tuple2<String, Integer>>() {
 
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String,
Integer> value2) throws
-							Exception {
+				@Override
+				public Tuple2<String, Integer> reduce(
+						Tuple2<String, Integer> value1,
+						Tuple2<String, Integer> value2) {
+					
+					if (value1.f1 % 3 == 0) {
+						value1.f1 += value2.f1;
+						return value1;
+					} else {
 						value2.f1 += value1.f1;
 						return value2;
 					}
+				}
 
-				});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,60\n";
-
-			}
-
-			case 2: {
-				// Global reduce
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class,
Integer.class).setParallelism(1);
-
-				DataSet<Tuple2<String, Integer>> result = input.reduce(new ReduceFunction<Tuple2<String,
Integer>>() {
-
-							@Override
-							public Tuple2<String, Integer> reduce(
-									Tuple2<String, Integer> value1,
-									Tuple2<String, Integer> value2) throws Exception {
-								if (value1.f1 % 2 == 0) {
-									value1.f1 += value2.f1;
-									return value1;
-								} else {
-									value2.f1 += value1.f1;
-									return value2;
-								}
-							}
-
-						});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,60\n";
-
-			}
+			});
 
-			case 3: {
-				// Add items to list without copying
+		Tuple2<String, Integer> res = result.collect().get(0);
+		assertEquals(new Tuple2<>("a", 60), res);
+	}
 
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	@Test
+	public void testKeyedGroupReduce() throws Exception {
 
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class,
Integer.class).setParallelism(1);
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (objectReuse) {
+			env.getConfig().enableObjectReuse();
+		} else {
+			env.getConfig().disableObjectReuse();
+		}
 
-				DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
+		DataSet<Tuple2<String, Integer>> input = env.fromCollection(GROUP_REDUCE_DATA);
 
-					@Override
-					public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) throws Exception {
-						List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String,
Integer>>();
-						for (Tuple2<String, Integer> val : values) {
-							list.add(val);
-						}
+		DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduceGroup(
+			new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>()
{
 
-						for (Tuple2<String, Integer> val : list) {
-							out.collect(val);
-						}
+				@Override
+				public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) {
+					List<Tuple2<String, Integer>> list = new ArrayList<>();
+					for (Tuple2<String, Integer> val : values) {
+						list.add(val);
 					}
 
-				});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				if (env.getConfig().isObjectReuseEnabled()) {
-					return "a,5\n" +
-							"a,4\n" +
-							"a,5\n" +
-							"a,4\n" +
-							"a,5\n";
-				} else {
-					return "a,1\n" +
-							"a,2\n" +
-							"a,3\n" +
-							"a,4\n" +
-							"a,5\n";
+					for (Tuple2<String, Integer> val : list) {
+						out.collect(val);
+					}
 				}
+			});
 
-			}
-
-			case 4: {
-				// Add items to list after copying
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		List<Tuple2<String, Integer>> is = result.collect();
+		Collections.sort(is, new TupleComparator<Tuple2<String, Integer>>());
 
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class,
Integer.class).setParallelism(1);
+		List<Tuple2<String, Integer>> expected = env.getConfig().isObjectReuseEnabled()
?
+			Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 4),
+				new Tuple2<>("a", 5), new Tuple2<>("a", 5), new Tuple2<>("a", 5)) :
+			Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2),
+				new Tuple2<>("a", 3), new Tuple2<>("a", 4), new Tuple2<>("a", 5));
 
-				DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
-
-					@Override
-					public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) throws Exception {
-						List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String,
Integer>>();
-						for (Tuple2<String, Integer> val : values) {
-							list.add(val.copy());
-						}
+		assertEquals(expected, is);
+	}
+	
+	@Test
+	public void testGlobalGroupReduce() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (objectReuse) {
+			env.getConfig().enableObjectReuse();
+		} else {
+			env.getConfig().disableObjectReuse();
+		}
 
-						for (Tuple2<String, Integer> val : list) {
-							out.collect(val);
-						}
+		DataSet<Tuple2<String, Integer>> input = env.fromCollection(GROUP_REDUCE_DATA);
+		
+		DataSet<Tuple2<String, Integer>> result = input.reduceGroup(
+			new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>()
{
+
+				@Override
+				public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) {
+					List<Tuple2<String, Integer>> list = new ArrayList<>();
+					for (Tuple2<String, Integer> val : values) {
+						list.add(val);
 					}
+	
+					for (Tuple2<String, Integer> val : list) {
+						out.collect(val);
+					}
+				}
+			});
 
-				});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,1\n" +
-						"a,2\n" +
-						"a,3\n" +
-						"a,4\n" +
-						"a,5\n";
+		List<Tuple2<String, Integer>> is = result.collect();
+		Collections.sort(is, new TupleComparator<Tuple2<String, Integer>>());
 
-			}
+		List<Tuple2<String, Integer>> expected = env.getConfig().isObjectReuseEnabled()
?
+			Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 4),
+				new Tuple2<>("a", 5), new Tuple2<>("a", 5), new Tuple2<>("a", 5)) :
+			Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2),
+				new Tuple2<>("a", 3), new Tuple2<>("a", 4), new Tuple2<>("a", 5));
+		
+		assertEquals(expected, is);
+	}
 
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-	
+	@Parameterized.Parameters(name = "Execution mode = CLUSTER, Reuse = {0}")
+	public static Collection<Object[]> executionModes() {
+		return Arrays.asList(
+			new Object[] { false, },
+			new Object[] { true } );
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac1e19e/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 972e451..6e234e6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -33,7 +33,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
@@ -58,10 +58,9 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	}
 
 	@Parameterized.Parameters(name = "Execution mode = {0}")
-	public static Collection<TestExecutionMode[]> executionModes(){
-		Collection<TestExecutionMode[]> c = new ArrayList<TestExecutionMode[]>(1);
-		c.add(new TestExecutionMode[] {TestExecutionMode.CLUSTER});
-		return c;
+	public static Collection<Object[]> executionModes() {
+		return Arrays.<Object[]>asList(
+			new Object[] { TestExecutionMode.CLUSTER } );
 	}
 
 	@Test


Mime
View raw message