Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0BFFD183DC for ; Sun, 27 Dec 2015 12:13:36 +0000 (UTC) Received: (qmail 5164 invoked by uid 500); 27 Dec 2015 12:13:35 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 5076 invoked by uid 500); 27 Dec 2015 12:13:35 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 4972 invoked by uid 99); 27 Dec 2015 12:13:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Dec 2015 12:13:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9051EE03CD; Sun, 27 Dec 2015 12:13:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Sun, 27 Dec 2015 12:13:39 -0000 Message-Id: In-Reply-To: <5e96c1e28449427a8f3f95689d40ce11@git.apache.org> References: <5e96c1e28449427a8f3f95689d40ce11@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] flink git commit: [FLINK-3166] [runtime] The first program in ObjectReuseITCase has the wrong expected result, and it succeeds - TestEnvironment now honors configuration of object reuse - Fixed reduce transformations to allow the user to modify and [FLINK-3166] [runtime] The first program in ObjectReuseITCase has the wrong expected result, and it succeeds - TestEnvironment now honors configuration of object reuse - Fixed reduce transformations to allow the user to modify and return either input This closes #1464 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c246ff2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c246ff2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c246ff2e Branch: refs/heads/master Commit: c246ff2e4623b38f1b05c0b3c0c8cb6ba165a33f Parents: 7cb25cb Author: Greg Hogan Authored: Wed Dec 16 14:42:06 2015 -0500 Committer: Stephan Ewen Committed: Sat Dec 26 19:08:09 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/operators/DataSourceTask.java | 10 +++- .../runtime/operators/ReduceCombineDriver.java | 5 ++ .../flink/runtime/operators/ReduceDriver.java | 5 ++ .../apache/flink/test/util/TestEnvironment.java | 13 ++++- .../javaApiOperators/ObjectReuseITCase.java | 51 ++++++++++++-------- 5 files changed, 62 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 915f66b..bcf0c2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -151,14 +151,20 @@ public class DataSourceTask extends AbstractInvokable { final Collector output = this.output; if (objectReuseEnabled) { - OT reuse = serializer.createInstance(); + OT reuse1 = serializer.createInstance(); + OT reuse2 = serializer.createInstance(); + OT reuse3 = serializer.createInstance(); // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { OT returned; - if ((returned = format.nextRecord(reuse)) != null) { + if ((returned = format.nextRecord(reuse1)) != null) { output.collect(returned); + + reuse1 = reuse2; + reuse2 = reuse3; + reuse3 = returned; } } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index c77e746..07aefc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -219,6 +219,11 @@ public class ReduceCombineDriver implements Driver, T> { if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value); + if (res == reuse2) { + T tmp = reuse1; + reuse1 = reuse2; + reuse2 = tmp; + } } else { // new key group break; http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index 6a7c42c..c73293f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -135,6 +135,11 @@ public class ReduceDriver implements Driver, T> { if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value); + if (res == reuse2) { + T tmp = reuse1; + reuse1 = reuse2; + reuse2 = tmp; + } } else { // new key group break; http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index 7605b3a..7cb88be 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -49,10 +49,21 @@ public class TestEnvironment extends ExecutionEnvironment { public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { this.executor = executor; setParallelism(parallelism); + // disabled to improve build time getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE); } + public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) { + this(executor, parallelism); + + if (isObjectReuseEnabled) { + getConfig().enableObjectReuse(); + } else { + getConfig().disableObjectReuse(); + } + } + @Override public void startNewSession() throws Exception { } @@ -89,7 +100,7 @@ public class TestEnvironment extends ExecutionEnvironment { ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { @Override public ExecutionEnvironment createExecutionEnvironment() { - lastEnv = new TestEnvironment(executor, getParallelism()); + lastEnv = new TestEnvironment(executor, getParallelism(), getConfig().isObjectReuseEnabled()); return lastEnv; } }; http://git-wip-us.apache.org/repos/asf/flink/blob/c246ff2e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java index 9019b8f..2ea4823 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java @@ -104,11 +104,12 @@ public class ObjectReuseITCase extends JavaProgramTestBase { switch(progId) { case 1: { + // Grouped reduce final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); DataSet> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1); + DataSet> result = input.groupBy(0).reduce(new ReduceFunction>() { @Override @@ -124,26 +125,30 @@ public class ObjectReuseITCase extends JavaProgramTestBase { env.execute(); // return expected result - return "a,100\n"; + return "a,60\n"; } case 2: { + // Global reduce final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); DataSet> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1); - DataSet> result = input - .reduce(new ReduceFunction>() { + DataSet> result = input.reduce(new ReduceFunction>() { @Override public Tuple2 reduce( Tuple2 value1, Tuple2 value2) throws Exception { - value2.f1 += value1.f1; - return value2; + if (value1.f1 % 2 == 0) { + value1.f1 += value2.f1; + return value1; + } else { + value2.f1 += value1.f1; + return value2; + } } }); @@ -152,14 +157,14 @@ public class ObjectReuseITCase extends JavaProgramTestBase { env.execute(); // return expected result - return "a,100\n"; + return "a,60\n"; } case 3: { + // Add items to list without copying final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); DataSet> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1); @@ -183,18 +188,26 @@ public class ObjectReuseITCase extends JavaProgramTestBase { env.execute(); // return expected result - return "a,4\n" + - "a,4\n" + - "a,5\n" + - "a,5\n" + - "a,5\n"; + if (env.getConfig().isObjectReuseEnabled()) { + return "a,5\n" + + "a,4\n" + + "a,5\n" + + "a,4\n" + + "a,5\n"; + } else { + return "a,1\n" + + "a,2\n" + + "a,3\n" + + "a,4\n" + + "a,5\n"; + } } case 4: { + // Add items to list after copying final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); DataSet> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1); @@ -204,7 +217,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { public void reduce(Iterable> values, Collector> out) throws Exception { List> list = new ArrayList>(); for (Tuple2 val : values) { - list.add(val); + list.add(val.copy()); } for (Tuple2 val : list) { @@ -218,10 +231,10 @@ public class ObjectReuseITCase extends JavaProgramTestBase { env.execute(); // return expected result - return "a,4\n" + + return "a,1\n" + + "a,2\n" + + "a,3\n" + "a,4\n" + - "a,5\n" + - "a,5\n" + "a,5\n"; }