Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C194B198A5 for ; Wed, 13 Apr 2016 16:12:59 +0000 (UTC) Received: (qmail 9866 invoked by uid 500); 13 Apr 2016 16:12:59 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 9826 invoked by uid 500); 13 Apr 2016 16:12:59 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 9817 invoked by uid 99); 13 Apr 2016 16:12:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Apr 2016 16:12:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86158DFB7C; Wed, 13 Apr 2016 16:12:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3589] Allow setting Operator parallelism to default value Date: Wed, 13 Apr 2016 16:12:59 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 6c061684f -> 5350bc48a [FLINK-3589] Allow setting Operator parallelism to default value Adds the public field ExecutionConfig.PARALLELISM_DEFAULT as a flag value to indicate that the default parallelism should be used. Adds the public field ExecutionConfig.PARALLELISM_UNKNOWN as a flag value to indicate that the parallelism should remain unchanged. This closes #1778 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5350bc48 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5350bc48 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5350bc48 Branch: refs/heads/master Commit: 5350bc48af37a1d6296de5b290b870b7d68407fa Parents: 6c06168 Author: Greg Hogan Authored: Wed Mar 9 12:41:45 2016 -0500 Committer: Greg Hogan Committed: Wed Apr 13 11:59:13 2016 -0400 ---------------------------------------------------------------------- .../org/apache/flink/client/LocalExecutor.java | 5 +- .../apache/flink/client/cli/ProgramOptions.java | 3 +- .../client/program/ContextEnvironment.java | 3 +- .../flink/api/common/ExecutionConfig.java | 30 ++++++--- .../java/org/apache/flink/api/common/Plan.java | 17 +++--- .../flink/api/common/operators/Operator.java | 16 ++--- .../flink/api/common/ExecutionConfigTest.java | 33 +++++++++- .../flink/api/java/ExecutionEnvironment.java | 2 +- .../apache/flink/api/java/LocalEnvironment.java | 3 +- .../flink/api/java/operators/DataSink.java | 3 +- .../api/java/operators/DeltaIteration.java | 8 ++- .../flink/api/java/operators/Operator.java | 20 +++--- .../flink/api/java/operator/OperatorTest.java | 64 ++++++++++++++++++++ .../translation/ReduceTranslationTests.java | 5 +- .../flink/optimizer/dag/OptimizerNode.java | 17 +++--- .../flink/runtime/jobgraph/JobVertex.java | 3 +- .../flink/streaming/api/graph/StreamNode.java | 3 +- .../translation/ReduceTranslationTest.scala | 7 ++- 18 files changed, 185 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 25da5c7..65bf5d8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -20,6 +20,7 @@ package org.apache.flink.client; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; @@ -206,7 +207,7 @@ public class LocalExecutor extends PlanExecutor { */ @Override public String getOptimizerPlanAsJSON(Plan plan) throws Exception { - final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism(); + final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism(); Optimizer pc = new Optimizer(new DataStatistics(), this.configuration); pc.setDefaultParallelism(parallelism); @@ -271,7 +272,7 @@ public class LocalExecutor extends PlanExecutor { * @throws Exception Thrown, if the optimization process that creates the execution plan failed. */ public static String optimizerPlanAsJSON(Plan plan) throws Exception { - final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism(); + final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism(); Optimizer pc = new Optimizer(new DataStatistics(), new Configuration()); pc.setDefaultParallelism(parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java index 73d49b5..368ec19 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; +import org.apache.flink.api.common.ExecutionConfig; import java.net.MalformedURLException; import java.net.URL; @@ -103,7 +104,7 @@ public abstract class ProgramOptions extends CommandLineOptions { } } else { - parallelism = -1; + parallelism = ExecutionConfig.PARALLELISM_DEFAULT; } stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt()); http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 987558c..dfb5f2e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; @@ -78,7 +79,7 @@ public class ContextEnvironment extends ExecutionEnvironment { @Override public String toString() { - return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism()) + return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ") : " + getIdString(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 9225cdd..c02bcad 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -67,10 +67,23 @@ public class ExecutionConfig implements Serializable { /** * The constant to use for the parallelism, if the system should use the number - * of currently available slots. + * of currently available slots. */ public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE; + /** + * The flag value indicating use of the default parallelism. This value can + * be used to reset the parallelism back to the default state. + */ + public static final int PARALLELISM_DEFAULT = -1; + + /** + * The flag value indicating an unknown or unset parallelism. This value is + * not a valid parallelism and indicates that the parallelism should remain + * unchanged. + */ + public static final int PARALLELISM_UNKNOWN = -2; + private static final long DEFAULT_RESTART_DELAY = 10000L; // -------------------------------------------------------------------------------------------- @@ -80,7 +93,7 @@ public class ExecutionConfig implements Serializable { private boolean useClosureCleaner = true; - private int parallelism = -1; + private int parallelism = PARALLELISM_DEFAULT; /** * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration @@ -212,7 +225,8 @@ public class ExecutionConfig implements Serializable { * with a parallelism of one (the final reduce to the single result value). * * @return The parallelism used by operations, unless they override that value. This method - * returns {@code -1}, if the environment's default parallelism should be used. + * returns {@link #PARALLELISM_DEFAULT} if the environment's default parallelism + * should be used. */ public int getParallelism() { return parallelism; @@ -231,11 +245,13 @@ public class ExecutionConfig implements Serializable { * @param parallelism The parallelism to use */ public ExecutionConfig setParallelism(int parallelism) { - if (parallelism < 1 && parallelism != -1) { - throw new IllegalArgumentException( - "Parallelism must be at least one, or -1 (use system default)."); + if (parallelism != PARALLELISM_UNKNOWN) { + if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) { + throw new IllegalArgumentException( + "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default)."); + } + this.parallelism = parallelism; } - this.parallelism = parallelism; return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/main/java/org/apache/flink/api/common/Plan.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index 03ada8a..30f2c2f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -52,9 +52,6 @@ import org.apache.flink.util.Visitor; @Internal public class Plan implements Visitable> { - /** The default parallelism indicates to use the cluster's default */ - private static final int DEFAULT_PARALELLISM = -1; - /** * A collection of all sinks in the plan. Since the plan is traversed from the sinks to the sources, this * collection must contain all the sinks. @@ -65,7 +62,7 @@ public class Plan implements Visitable> { protected String jobName; /** The default parallelism to use for nodes that have no explicitly specified parallelism. */ - protected int defaultParallelism = DEFAULT_PARALELLISM; + protected int defaultParallelism = ExecutionConfig.PARALLELISM_DEFAULT; /** Hash map for files in the distributed cache: registered name to cache entry. */ protected HashMap cacheFile = new HashMap<>(); @@ -91,7 +88,7 @@ public class Plan implements Visitable> { * @param jobName The name to display for the job. */ public Plan(Collection> sinks, String jobName) { - this(sinks, jobName, DEFAULT_PARALELLISM); + this(sinks, jobName, ExecutionConfig.PARALLELISM_DEFAULT); } /** @@ -122,7 +119,7 @@ public class Plan implements Visitable> { * @param jobName The name to display for the job. */ public Plan(GenericDataSinkBase sink, String jobName) { - this(sink, jobName, DEFAULT_PARALELLISM); + this(sink, jobName, ExecutionConfig.PARALLELISM_DEFAULT); } /** @@ -152,7 +149,7 @@ public class Plan implements Visitable> { * @param sinks The collection will the sinks of the data flow. */ public Plan(Collection> sinks) { - this(sinks, DEFAULT_PARALELLISM); + this(sinks, ExecutionConfig.PARALLELISM_DEFAULT); } /** @@ -180,7 +177,7 @@ public class Plan implements Visitable> { * @param sink The data sink of the data flow. */ public Plan(GenericDataSinkBase sink) { - this(sink, DEFAULT_PARALELLISM); + this(sink, ExecutionConfig.PARALLELISM_DEFAULT); } /** @@ -287,8 +284,8 @@ public class Plan implements Visitable> { * @param defaultParallelism The default parallelism for the plan. */ public void setDefaultParallelism(int defaultParallelism) { - checkArgument(defaultParallelism >= 1 || defaultParallelism == -1, - "The default parallelism must be positive, or -1 if the system should use the globally configured default."); + checkArgument(defaultParallelism >= 1 || defaultParallelism == ExecutionConfig.PARALLELISM_DEFAULT, + "The default parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT if the system should use the globally configured default."); this.defaultParallelism = defaultParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index 36f1f85..7e70fd7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -22,6 +22,7 @@ package org.apache.flink.api.common.operators; import java.util.List; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -42,7 +43,7 @@ public abstract class Operator implements Visitable> { protected String name; // the name of the contract instance. optional. - private int parallelism = -1; // the number of parallel instances to use. -1, if unknown + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use /** * The return type of the user function. @@ -162,9 +163,10 @@ public abstract class Operator implements Visitable> { } /** - * Gets the parallelism for this contract instance. The parallelism denotes - * how many parallel instances of the user function will be spawned during the execution. If this - * value is -1, then the system will decide the number of parallel instances by itself. + * Gets the parallelism for this contract instance. The parallelism denotes how many + * parallel instances of the user function will be spawned during the execution. If this + * value is {@link ExecutionConfig#PARALLELISM_DEFAULT}, then the system will decide the + * number of parallel instances by itself. * * @return The parallelism. */ @@ -174,10 +176,10 @@ public abstract class Operator implements Visitable> { /** * Sets the parallelism for this contract instance. The parallelism denotes - * how many parallel instances of the user function will be spawned during the execution. Set this - * value to -1 to let the system decide on its own. + * how many parallel instances of the user function will be spawned during the execution. * - * @param parallelism The number of parallel instances to spawn. -1, if unspecified. + * @param parallelism The number of parallel instances to spawn. Set this value to + * {@link ExecutionConfig#PARALLELISM_DEFAULT} to let the system decide on its own. */ public void setParallelism(int parallelism) { this.parallelism = parallelism; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java index 39b3b71..158d971 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -18,12 +18,13 @@ package org.apache.flink.api.common; -import static org.junit.Assert.*; +import org.junit.Test; import java.util.Arrays; import java.util.List; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ExecutionConfigTest { @@ -45,4 +46,32 @@ public class ExecutionConfigTest { assertTrue(counter == expectedTypes.size()); } + + @Test + public void testConfigurationOfParallelism() { + ExecutionConfig config = new ExecutionConfig(); + + // verify that PARALLELISM_UNKNOWN does not change initial parallelism + int parallelism = config.getParallelism(); + config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN); + + assertEquals(parallelism, config.getParallelism()); + + // verify explicit change in parallelism + parallelism = 36; + config.setParallelism(parallelism); + + assertEquals(parallelism, config.getParallelism()); + + // verify that PARALLELISM_UNKNOWN does not change configured parallelism + config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN); + + assertEquals(parallelism, config.getParallelism()); + + // verify that parallelism is reset to default flag value + parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + config.setParallelism(parallelism); + + assertEquals(parallelism, config.getParallelism()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 89c817d..d108d3e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -160,7 +160,7 @@ public abstract class ExecutionEnvironment { * set will insert eventually an operation that runs non-parallel (parallelism of one). * * @return The parallelism used by operations, unless they override that value. This method - * returns {@code -1}, if the environments default parallelism should be used. + * returns {@link ExecutionConfig#PARALLELISM_DEFAULT}, if the environment's default parallelism should be used. */ public int getParallelism() { return config.getParallelism(); http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java index d1fe298..0b2567a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; @@ -137,7 +138,7 @@ public class LocalEnvironment extends ExecutionEnvironment { @Override public String toString() { - return "Local Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism()) + return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ") : " + getIdString(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 37f6cc2..d4b3e3b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.GenericDataSinkBase; @@ -47,7 +48,7 @@ public class DataSink { private String name; - private int parallelism = -1; + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; private Configuration parameters; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index cc1cd66..ccf88a6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -22,6 +22,7 @@ import java.util.Arrays; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.AggregatorRegistry; @@ -59,7 +60,7 @@ public class DeltaIteration { private String name; - private int parallelism = -1; + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; private boolean solutionSetUnManaged; @@ -176,7 +177,8 @@ public class DeltaIteration { * @return The iteration object, for function call chaining. */ public DeltaIteration parallelism(int parallelism) { - Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default)."); + Preconditions.checkArgument(parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT, + "The parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT (use default)."); this.parallelism = parallelism; return this; } @@ -184,7 +186,7 @@ public class DeltaIteration { /** * Gets the iteration's parallelism. * - * @return The iterations parallelism, or -1, if not set. + * @return The iteration's parallelism, or {@link ExecutionConfig#PARALLELISM_DEFAULT} if not set. */ public int getParallelism() { return parallelism; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 971cba8..197c6d2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -34,7 +35,7 @@ public abstract class Operator> extends DataSet< protected String name; - protected int parallelism = -1; + protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; protected Operator(ExecutionEnvironment context, TypeInformation resultType) { super(context, resultType); @@ -85,17 +86,22 @@ public abstract class Operator> extends DataSet< /** * Sets the parallelism for this operator. - * The degree must be 1 or more. + * The parallelism must be 1 or more. * - * @param parallelism The parallelism for this operator. + * @param parallelism The parallelism for this operator. A value equal to {@link ExecutionConfig#PARALLELISM_DEFAULT} + * will use the system default and a value equal to {@link ExecutionConfig#PARALLELISM_UNKNOWN} will leave + * the parallelism unchanged. * @return The operator with set parallelism. */ public O setParallelism(int parallelism) { - if(parallelism < 1) { - throw new IllegalArgumentException("The parallelism of an operator must be at least 1."); + if (parallelism != ExecutionConfig.PARALLELISM_UNKNOWN) { + if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) { + throw new IllegalArgumentException("The parallelism of an operator must be at least 1."); + } + + this.parallelism = parallelism; } - this.parallelism = parallelism; - + @SuppressWarnings("unchecked") O returnType = (O) this; return returnType; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java new file mode 100644 index 0000000..4a17ca9 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.Operator; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class OperatorTest { + + @Test + public void testConfigurationOfParallelism() { + Operator operator = new MockOperator(); + + // verify that PARALLELISM_UNKNOWN does not change initial parallelism + int parallelism = operator.getParallelism(); + operator.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN); + + assertEquals(parallelism, operator.getParallelism()); + + // verify explicit change in parallelism + parallelism = 36; + operator.setParallelism(parallelism); + + assertEquals(parallelism, operator.getParallelism()); + + // verify that PARALLELISM_UNKNOWN does not change configured parallelism + operator.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN); + + assertEquals(parallelism, operator.getParallelism()); + + // verify that parallelism is reset to default flag value + parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + operator.setParallelism(parallelism); + + assertEquals(parallelism, operator.getParallelism()); + } + + private class MockOperator extends Operator { + public MockOperator() { + super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java index b555844..3adbbb8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java @@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators.translation; import static org.junit.Assert.*; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.GenericDataSourceBase; @@ -71,7 +72,7 @@ public class ReduceTranslationTests implements java.io.Serializable { assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0); // parallelism was not configured on the operator - assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1); + assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT); assertTrue(reducer.getInput() instanceof GenericDataSourceBase); } @@ -110,7 +111,7 @@ public class ReduceTranslationTests implements java.io.Serializable { assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); // parallelism was not configured on the operator - assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == -1); + assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT); // check keys assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0)); http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java index a3bf995..cff8f18 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.api.common.operators.AbstractUdfOperator; import org.apache.flink.api.common.operators.CompilerHints; @@ -98,7 +99,7 @@ public abstract class OptimizerNode implements Visitable, Estimat // --------------------------------- General Parameters --------------------------------------- - private int parallelism = -1; // the number of parallel instances of this node + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances of this node private long minimalMemoryPerSubTask = -1; @@ -372,8 +373,8 @@ public abstract class OptimizerNode implements Visitable, Estimat /** * Gets the parallelism for the operator represented by this optimizer node. * The parallelism denotes how many parallel instances of the operator on will be - * spawned during the execution. If this value is -1, then the system will take - * the default number of parallel instances. + * spawned during the execution. If this value is {@link ExecutionConfig#PARALLELISM_DEFAULT} + * then the system will take the default number of parallel instances. * * @return The parallelism of the operator. */ @@ -384,14 +385,14 @@ public abstract class OptimizerNode implements Visitable, Estimat /** * Sets the parallelism for this optimizer node. * The parallelism denotes how many parallel instances of the operator will be - * spawned during the execution. If this value is set to -1, then the system will take - * the default number of parallel instances. + * spawned during the execution. * - * @param parallelism The parallelism to set. - * @throws IllegalArgumentException If the parallelism is smaller than one and not -1. + * @param parallelism The parallelism to set. If this value is {@link ExecutionConfig#PARALLELISM_DEFAULT} + * then the system will take the default number of parallel instances. + * @throws IllegalArgumentException If the parallelism is smaller than one. */ public void setParallelism(int parallelism) { - if (parallelism < 1 && parallelism != -1) { + if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) { throw new IllegalArgumentException("Parallelism of " + parallelism + " is invalid."); } this.parallelism = parallelism; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 9018029..acb22aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -55,7 +56,7 @@ public class JobVertex implements java.io.Serializable { private final ArrayList inputs = new ArrayList(); /** Number of subtasks to split this task into at runtime.*/ - private int parallelism = -1; + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; /** Custom configuration passed to the assigned task at runtime. */ private Configuration configuration; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 533f1e1..430760b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -129,7 +130,7 @@ public class StreamNode implements Serializable { } public int getParallelism() { - if (parallelism == -1) { + if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { return env.getParallelism(); } else { return parallelism; http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala index 5d3878c..3cdb88c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase import org.junit.Test import org.apache.flink.api.scala._ +import org.apache.flink.api.common.ExecutionConfig class ReduceTranslationTest { @Test @@ -52,7 +53,8 @@ class ReduceTranslationTest { assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getInputType) assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getOutputType) assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0) - assertTrue(reducer.getParallelism == 1 || reducer.getParallelism == -1) + assertTrue(reducer.getParallelism == 1 || + reducer.getParallelism == ExecutionConfig.PARALLELISM_DEFAULT) assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]]) } catch { @@ -81,7 +83,8 @@ class ReduceTranslationTest { val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]] assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getInputType) assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getOutputType) - assertTrue(reducer.getParallelism == parallelism || reducer.getParallelism == -1) + assertTrue(reducer.getParallelism == parallelism || + reducer.getParallelism == ExecutionConfig.PARALLELISM_DEFAULT) assertArrayEquals(Array[Int](2), reducer.getKeyColumns(0)) assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]]) }