flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/7] flink git commit: [FLINK-3166] [runtime] The first program in ObjectReuseITCase has the wrong expected result, and it succeeds - TestEnvironment now honors configuration of object reuse - Fixed reduce transformations to allow the user to modify and
Date Sun, 27 Dec 2015 12:13:39 GMT
[FLINK-3166] [runtime] The first program in ObjectReuseITCase has the wrong expected result,
and it succeeds
- TestEnvironment now honors configuration of object reuse
- Fixed reduce transformations to allow the user to modify and return either input

This closes #1464


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c246ff2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c246ff2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c246ff2e

Branch: refs/heads/master
Commit: c246ff2e4623b38f1b05c0b3c0c8cb6ba165a33f
Parents: 7cb25cb
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Dec 16 14:42:06 2015 -0500
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Dec 26 19:08:09 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/operators/DataSourceTask.java | 10 +++-
 .../runtime/operators/ReduceCombineDriver.java  |  5 ++
 .../flink/runtime/operators/ReduceDriver.java   |  5 ++
 .../apache/flink/test/util/TestEnvironment.java | 13 ++++-
 .../javaApiOperators/ObjectReuseITCase.java     | 51 ++++++++++++--------
 5 files changed, 62 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 915f66b..bcf0c2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -151,14 +151,20 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					final Collector<OT> output = this.output;
 
 					if (objectReuseEnabled) {
-						OT reuse = serializer.createInstance();
+						OT reuse1 = serializer.createInstance();
+						OT reuse2 = serializer.createInstance();
+						OT reuse3 = serializer.createInstance();
 
 						// as long as there is data to read
 						while (!this.taskCanceled && !format.reachedEnd()) {
 
 							OT returned;
-							if ((returned = format.nextRecord(reuse)) != null) {
+							if ((returned = format.nextRecord(reuse1)) != null) {
 								output.collect(returned);
+
+								reuse1 = reuse2;
+								reuse2 = reuse3;
+								reuse3 = returned;
 							}
 						}
 					} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index c77e746..07aefc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -219,6 +219,11 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>,
T> {
 						if (comparator.equalToReference(value)) {
 							// same group, reduce
 							res = function.reduce(res, value);
+							if (res == reuse2) {
+								T tmp = reuse1;
+								reuse1 = reuse2;
+								reuse2 = tmp;
+							}
 						} else {
 							// new key group
 							break;

http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 6a7c42c..c73293f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -135,6 +135,11 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>,
T> {
 					if (comparator.equalToReference(value)) {
 						// same group, reduce
 						res = function.reduce(res, value);
+						if (res == reuse2) {
+							T tmp = reuse1;
+							reuse1 = reuse2;
+							reuse2 = tmp;
+						}
 					} else {
 						// new key group
 						break;

http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 7605b3a..7cb88be 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -49,10 +49,21 @@ public class TestEnvironment extends ExecutionEnvironment {
 	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
 		setParallelism(parallelism);
+
 		// disabled to improve build time
 		getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
 	}
 
+	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled)
{
+		this(executor, parallelism);
+
+		if (isObjectReuseEnabled) {
+			getConfig().enableObjectReuse();
+		} else {
+			getConfig().disableObjectReuse();
+		}
+	}
+
 	@Override
 	public void startNewSession() throws Exception {
 	}
@@ -89,7 +100,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
 			@Override
 			public ExecutionEnvironment createExecutionEnvironment() {
-				lastEnv = new TestEnvironment(executor, getParallelism());
+				lastEnv = new TestEnvironment(executor, getParallelism(), getConfig().isObjectReuseEnabled());
 				return lastEnv;
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
index 9019b8f..2ea4823 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
@@ -104,11 +104,12 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 			switch(progId) {
 
 			case 1: {
+				// Grouped reduce
 
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
 
 				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class,
Integer.class).setParallelism(1);
+
 				DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String,
Integer>>() {
 
 					@Override
@@ -124,26 +125,30 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				env.execute();
 
 				// return expected result
-				return "a,100\n";
+				return "a,60\n";
 
 			}
 
 			case 2: {
+				// Global reduce
 
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
 
 				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class,
Integer.class).setParallelism(1);
 
-				DataSet<Tuple2<String, Integer>> result = input
-						.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+				DataSet<Tuple2<String, Integer>> result = input.reduce(new ReduceFunction<Tuple2<String,
Integer>>() {
 
 							@Override
 							public Tuple2<String, Integer> reduce(
 									Tuple2<String, Integer> value1,
 									Tuple2<String, Integer> value2) throws Exception {
-								value2.f1 += value1.f1;
-								return value2;
+								if (value1.f1 % 2 == 0) {
+									value1.f1 += value2.f1;
+									return value1;
+								} else {
+									value2.f1 += value1.f1;
+									return value2;
+								}
 							}
 
 						});
@@ -152,14 +157,14 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				env.execute();
 
 				// return expected result
-				return "a,100\n";
+				return "a,60\n";
 
 			}
 
 			case 3: {
+				// Add items to list without copying
 
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
 
 				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class,
Integer.class).setParallelism(1);
 
@@ -183,18 +188,26 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				env.execute();
 
 				// return expected result
-				return "a,4\n" +
-						"a,4\n" +
-						"a,5\n" +
-						"a,5\n" +
-						"a,5\n";
+				if (env.getConfig().isObjectReuseEnabled()) {
+					return "a,5\n" +
+							"a,4\n" +
+							"a,5\n" +
+							"a,4\n" +
+							"a,5\n";
+				} else {
+					return "a,1\n" +
+							"a,2\n" +
+							"a,3\n" +
+							"a,4\n" +
+							"a,5\n";
+				}
 
 			}
 
 			case 4: {
+				// Add items to list after copying
 
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
 
 				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class,
Integer.class).setParallelism(1);
 
@@ -204,7 +217,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 					public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) throws Exception {
 						List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String,
Integer>>();
 						for (Tuple2<String, Integer> val : values) {
-							list.add(val);
+							list.add(val.copy());
 						}
 
 						for (Tuple2<String, Integer> val : list) {
@@ -218,10 +231,10 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 				env.execute();
 
 				// return expected result
-				return "a,4\n" +
+				return "a,1\n" +
+						"a,2\n" +
+						"a,3\n" +
 						"a,4\n" +
-						"a,5\n" +
-						"a,5\n" +
 						"a,5\n";
 
 			}


Mime
View raw message