flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject flink git commit: [FLINK-3589] Allow setting Operator parallelism to default value
Date Wed, 13 Apr 2016 16:12:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6c061684f -> 5350bc48a


[FLINK-3589] Allow setting Operator parallelism to default value

Adds the public field ExecutionConfig.PARALLELISM_DEFAULT as a flag
value to indicate that the default parallelism should be used.

Adds the public field ExecutionConfig.PARALLELISM_UNKNOWN as a flag
value to indicate that the parallelism should remain unchanged.

This closes #1778


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5350bc48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5350bc48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5350bc48

Branch: refs/heads/master
Commit: 5350bc48af37a1d6296de5b290b870b7d68407fa
Parents: 6c06168
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Mar 9 12:41:45 2016 -0500
Committer: Greg Hogan <code@greghogan.com>
Committed: Wed Apr 13 11:59:13 2016 -0400

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |  5 +-
 .../apache/flink/client/cli/ProgramOptions.java |  3 +-
 .../client/program/ContextEnvironment.java      |  3 +-
 .../flink/api/common/ExecutionConfig.java       | 30 ++++++---
 .../java/org/apache/flink/api/common/Plan.java  | 17 +++---
 .../flink/api/common/operators/Operator.java    | 16 ++---
 .../flink/api/common/ExecutionConfigTest.java   | 33 +++++++++-
 .../flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../apache/flink/api/java/LocalEnvironment.java |  3 +-
 .../flink/api/java/operators/DataSink.java      |  3 +-
 .../api/java/operators/DeltaIteration.java      |  8 ++-
 .../flink/api/java/operators/Operator.java      | 20 +++---
 .../flink/api/java/operator/OperatorTest.java   | 64 ++++++++++++++++++++
 .../translation/ReduceTranslationTests.java     |  5 +-
 .../flink/optimizer/dag/OptimizerNode.java      | 17 +++---
 .../flink/runtime/jobgraph/JobVertex.java       |  3 +-
 .../flink/streaming/api/graph/StreamNode.java   |  3 +-
 .../translation/ReduceTranslationTest.scala     |  7 ++-
 18 files changed, 185 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 25da5c7..65bf5d8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.client;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -206,7 +207,7 @@ public class LocalExecutor extends PlanExecutor {
 	 */
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+		final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
? 1 : plan.getDefaultParallelism();
 
 		Optimizer pc = new Optimizer(new DataStatistics(), this.configuration);
 		pc.setDefaultParallelism(parallelism);
@@ -271,7 +272,7 @@ public class LocalExecutor extends PlanExecutor {
 	 * @throws Exception Thrown, if the optimization process that creates the execution plan
failed.
 	 */
 	public static String optimizerPlanAsJSON(Plan plan) throws Exception {
-		final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+		final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
? 1 : plan.getDefaultParallelism();
 
 		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
 		pc.setDefaultParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 73d49b5..368ec19 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -18,6 +18,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.ExecutionConfig;
 
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -103,7 +104,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 			}
 		}
 		else {
-			parallelism = -1;
+			parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 		}
 
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 987558c..dfb5f2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -78,7 +79,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
 	@Override
 	public String toString() {
-		return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
+		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
? "default" : getParallelism())
 				+ ") : " + getIdString();
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 9225cdd..c02bcad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -67,10 +67,23 @@ public class ExecutionConfig implements Serializable {
 
 	/**
 	 * The constant to use for the parallelism, if the system should use the number
-	 *  of currently available slots.
+	 * of currently available slots.
 	 */
 	public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
 
+	/**
+	 * The flag value indicating use of the default parallelism. This value can
+	 * be used to reset the parallelism back to the default state.
+	 */
+	public static final int PARALLELISM_DEFAULT = -1;
+
+	/**
+	 * The flag value indicating an unknown or unset parallelism. This value is
+	 * not a valid parallelism and indicates that the parallelism should remain
+	 * unchanged.
+	 */
+	public static final int PARALLELISM_UNKNOWN = -2;
+
 	private static final long DEFAULT_RESTART_DELAY = 10000L;
 
 	// --------------------------------------------------------------------------------------------
@@ -80,7 +93,7 @@ public class ExecutionConfig implements Serializable {
 
 	private boolean useClosureCleaner = true;
 
-	private int parallelism = -1;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
@@ -212,7 +225,8 @@ public class ExecutionConfig implements Serializable {
 	 * with a parallelism of one (the final reduce to the single result value).
 	 *
 	 * @return The parallelism used by operations, unless they override that value. This method
-	 *         returns {@code -1}, if the environment's default parallelism should be used.
+	 *         returns {@link #PARALLELISM_DEFAULT} if the environment's default parallelism
+	 *         should be used.
 	 */
 	public int getParallelism() {
 		return parallelism;
@@ -231,11 +245,13 @@ public class ExecutionConfig implements Serializable {
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		if (parallelism < 1 && parallelism != -1) {
-			throw new IllegalArgumentException(
-					"Parallelism must be at least one, or -1 (use system default).");
+		if (parallelism != PARALLELISM_UNKNOWN) {
+			if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
+				throw new IllegalArgumentException(
+					"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system
default).");
+			}
+			this.parallelism = parallelism;
 		}
-		this.parallelism = parallelism;
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 03ada8a..30f2c2f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -52,9 +52,6 @@ import org.apache.flink.util.Visitor;
 @Internal
 public class Plan implements Visitable<Operator<?>> {
 
-	/** The default parallelism indicates to use the cluster's default */
-	private static final int DEFAULT_PARALELLISM = -1;
-	
 	/**
 	 * A collection of all sinks in the plan. Since the plan is traversed from the sinks to
the sources, this
 	 * collection must contain all the sinks.
@@ -65,7 +62,7 @@ public class Plan implements Visitable<Operator<?>> {
 	protected String jobName;
 
 	/** The default parallelism to use for nodes that have no explicitly specified parallelism.
*/
-	protected int defaultParallelism = DEFAULT_PARALELLISM;
+	protected int defaultParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 	
 	/** Hash map for files in the distributed cache: registered name to cache entry. */
 	protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<>();
@@ -91,7 +88,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 * @param jobName The name to display for the job.
 	 */
 	public Plan(Collection<? extends GenericDataSinkBase<?>> sinks, String jobName)
{
-		this(sinks, jobName, DEFAULT_PARALELLISM);
+		this(sinks, jobName, ExecutionConfig.PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -122,7 +119,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 * @param jobName The name to display for the job.
 	 */
 	public Plan(GenericDataSinkBase<?> sink, String jobName) {
-		this(sink, jobName, DEFAULT_PARALELLISM);
+		this(sink, jobName, ExecutionConfig.PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -152,7 +149,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 * @param sinks The collection will the sinks of the data flow.
 	 */
 	public Plan(Collection<? extends GenericDataSinkBase<?>> sinks) {
-		this(sinks, DEFAULT_PARALELLISM);
+		this(sinks, ExecutionConfig.PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -180,7 +177,7 @@ public class Plan implements Visitable<Operator<?>> {
 	 * @param sink The data sink of the data flow.
 	 */
 	public Plan(GenericDataSinkBase<?> sink) {
-		this(sink, DEFAULT_PARALELLISM);
+		this(sink, ExecutionConfig.PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -287,8 +284,8 @@ public class Plan implements Visitable<Operator<?>> {
 	 * @param defaultParallelism The default parallelism for the plan.
 	 */
 	public void setDefaultParallelism(int defaultParallelism) {
-		checkArgument(defaultParallelism >= 1 || defaultParallelism == -1,
-			"The default parallelism must be positive, or -1 if the system should use the globally
configured default.");
+		checkArgument(defaultParallelism >= 1 || defaultParallelism == ExecutionConfig.PARALLELISM_DEFAULT,
+			"The default parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT if the
system should use the globally configured default.");
 		
 		this.defaultParallelism = defaultParallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 36f1f85..7e70fd7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -22,6 +22,7 @@ package org.apache.flink.api.common.operators;
 import java.util.List;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
@@ -42,7 +43,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>>
{
 	
 	protected String name;								// the name of the contract instance. optional.
 		
-	private int parallelism = -1;				// the number of parallel instances to use. -1, if unknown
+	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;  // the number of parallel
instances to use
 
 	/**
 	 * The return type of the user function.
@@ -162,9 +163,10 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>>
{
 	}
 
 	/**
-	 * Gets the parallelism for this contract instance. The parallelism denotes
-	 * how many parallel instances of the user function will be spawned during the execution.
If this
-	 * value is <code>-1</code>, then the system will decide the number of parallel
instances by itself.
+	 * Gets the parallelism for this contract instance. The parallelism denotes how many
+	 * parallel instances of the user function will be spawned during the execution. If this
+	 * value is {@link ExecutionConfig#PARALLELISM_DEFAULT}, then the system will decide the
+	 * number of parallel instances by itself.
 	 *
 	 * @return The parallelism.
 	 */
@@ -174,10 +176,10 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>>
{
 
 	/**
 	 * Sets the parallelism for this contract instance. The parallelism denotes
-	 * how many parallel instances of the user function will be spawned during the execution.
Set this
-	 * value to <code>-1</code> to let the system decide on its own.
+	 * how many parallel instances of the user function will be spawned during the execution.
 	 *
-	 * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
+	 * @param parallelism The number of parallel instances to spawn. Set this value to
+	 *        {@link ExecutionConfig#PARALLELISM_DEFAULT} to let the system decide on its own.
 	 */
 	public void setParallelism(int parallelism) {
 		this.parallelism = parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 39b3b71..158d971 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.common;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ExecutionConfigTest {
 
@@ -45,4 +46,32 @@ public class ExecutionConfigTest {
 
 		assertTrue(counter == expectedTypes.size());
 	}
+
+	@Test
+	public void testConfigurationOfParallelism() {
+		ExecutionConfig config = new ExecutionConfig();
+
+		// verify that PARALLELISM_UNKNOWN does not change initial parallelism
+		int parallelism = config.getParallelism();
+		config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
+
+		assertEquals(parallelism, config.getParallelism());
+
+		// verify explicit change in parallelism
+		parallelism = 36;
+		config.setParallelism(parallelism);
+
+		assertEquals(parallelism, config.getParallelism());
+
+		// verify that PARALLELISM_UNKNOWN does not change configured parallelism
+		config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
+
+		assertEquals(parallelism, config.getParallelism());
+
+		// verify that parallelism is reset to default flag value
+		parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+		config.setParallelism(parallelism);
+
+		assertEquals(parallelism, config.getParallelism());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 89c817d..d108d3e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -160,7 +160,7 @@ public abstract class ExecutionEnvironment {
 	 * set will insert eventually an operation that runs non-parallel (parallelism of one).
 	 *
 	 * @return The parallelism used by operations, unless they override that value. This method
-	 *         returns {@code -1}, if the environments default parallelism should be used.
+	 *         returns {@link ExecutionConfig#PARALLELISM_DEFAULT}, if the environment's default
parallelism should be used.
 	 */
 	public int getParallelism() {
 		return config.getParallelism();

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index d1fe298..0b2567a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -137,7 +138,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	
 	@Override
 	public String toString() {
-		return "Local Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
+		return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
? "default" : getParallelism())
 				+ ") : " + getIdString();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 37f6cc2..d4b3e3b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -47,7 +48,7 @@ public class DataSink<T> {
 	
 	private String name;
 	
-	private int parallelism = -1;
+	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	private Configuration parameters;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index cc1cd66..ccf88a6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
@@ -59,7 +60,7 @@ public class DeltaIteration<ST, WT> {
 	
 	private String name;
 	
-	private int parallelism = -1;
+	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 	
 	private boolean solutionSetUnManaged;
 	
@@ -176,7 +177,8 @@ public class DeltaIteration<ST, WT> {
 	 * @return The iteration object, for function call chaining.
 	 */
 	public DeltaIteration<ST, WT> parallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must
be positive, or -1 (use default).");
+		Preconditions.checkArgument(parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
+			"The parallelism must be positive, or ExecutionConfig.PARALLELISM_DEFAULT (use default).");
 		this.parallelism = parallelism;
 		return this;
 	}
@@ -184,7 +186,7 @@ public class DeltaIteration<ST, WT> {
 	/**
 	 * Gets the iteration's parallelism.
 	 * 
-	 * @return The iterations parallelism, or -1, if not set.
+	 * @return The iteration's parallelism, or {@link ExecutionConfig#PARALLELISM_DEFAULT} if
not set.
 	 */
 	public int getParallelism() {
 		return parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 971cba8..197c6d2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -34,7 +35,7 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>>
extends DataSet<
 
 	protected String name;
 	
-	protected int parallelism = -1;
+	protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType)
{
 		super(context, resultType);
@@ -85,17 +86,22 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>>
extends DataSet<
 	
 	/**
 	 * Sets the parallelism for this operator.
-	 * The degree must be 1 or more.
+	 * The parallelism must be 1 or more.
 	 * 
-	 * @param parallelism The parallelism for this operator.
+	 * @param parallelism The parallelism for this operator. A value equal to {@link ExecutionConfig#PARALLELISM_DEFAULT}
+	 *        will use the system default and a value equal to {@link ExecutionConfig#PARALLELISM_UNKNOWN}
will leave
+	 *        the parallelism unchanged.
 	 * @return The operator with set parallelism.
 	 */
 	public O setParallelism(int parallelism) {
-		if(parallelism < 1) {
-			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+		if (parallelism != ExecutionConfig.PARALLELISM_UNKNOWN) {
+			if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT)
{
+				throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+			}
+
+			this.parallelism = parallelism;
 		}
-		this.parallelism = parallelism;
-		
+
 		@SuppressWarnings("unchecked")
 		O returnType = (O) this;
 		return returnType;

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
new file mode 100644
index 0000000..4a17ca9
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.Operator;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class OperatorTest {
+
+	@Test
+	public void testConfigurationOfParallelism() {
+		Operator operator = new MockOperator();
+
+		// verify that PARALLELISM_UNKNOWN does not change initial parallelism
+		int parallelism = operator.getParallelism();
+		operator.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
+
+		assertEquals(parallelism, operator.getParallelism());
+
+		// verify explicit change in parallelism
+		parallelism = 36;
+		operator.setParallelism(parallelism);
+
+		assertEquals(parallelism, operator.getParallelism());
+
+		// verify that PARALLELISM_UNKNOWN does not change configured parallelism
+		operator.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
+
+		assertEquals(parallelism, operator.getParallelism());
+
+		// verify that parallelism is reset to default flag value
+		parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+		operator.setParallelism(parallelism);
+
+		assertEquals(parallelism, operator.getParallelism());
+	}
+
+	private class MockOperator extends Operator {
+		public MockOperator() {
+			super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index b555844..3adbbb8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators.translation;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
@@ -71,7 +72,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
 			
 			// parallelism was not configured on the operator
-			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
+			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
 			
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
@@ -110,7 +111,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
 			
 			// parallelism was not configured on the operator
-			assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == -1);
+			assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
 			
 			// check keys
 			assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index a3bf995..cff8f18 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.operators.AbstractUdfOperator;
 import org.apache.flink.api.common.operators.CompilerHints;
@@ -98,7 +99,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>,
Estimat
 
 	// --------------------------------- General Parameters ---------------------------------------
 	
-	private int parallelism = -1; // the number of parallel instances of this node
+	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel
instances of this node
 
 	private long minimalMemoryPerSubTask = -1;
 
@@ -372,8 +373,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>,
Estimat
 	/**
 	 * Gets the parallelism for the operator represented by this optimizer node.
 	 * The parallelism denotes how many parallel instances of the operator on will be
-	 * spawned during the execution. If this value is <code>-1</code>, then the
system will take
-	 * the default number of parallel instances.
+	 * spawned during the execution. If this value is {@link ExecutionConfig#PARALLELISM_DEFAULT}
+	 * then the system will take the default number of parallel instances.
 	 * 
 	 * @return The parallelism of the operator.
 	 */
@@ -384,14 +385,14 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>,
Estimat
 	/**
 	 * Sets the parallelism for this optimizer node.
 	 * The parallelism denotes how many parallel instances of the operator will be
-	 * spawned during the execution. If this value is set to <code>-1</code>, then
the system will take
-	 * the default number of parallel instances.
+	 * spawned during the execution.
 	 * 
-	 * @param parallelism The parallelism to set.
-	 * @throws IllegalArgumentException If the parallelism is smaller than one and not -1.
+	 * @param parallelism The parallelism to set. If this value is {@link ExecutionConfig#PARALLELISM_DEFAULT}
+	 *        then the system will take the default number of parallel instances.
+	 * @throws IllegalArgumentException If the parallelism is smaller than one.
 	 */
 	public void setParallelism(int parallelism) {
-		if (parallelism < 1 && parallelism != -1) {
+		if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
 			throw new IllegalArgumentException("Parallelism of " + parallelism + " is invalid.");
 		}
 		this.parallelism = parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 9018029..acb22aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -55,7 +56,7 @@ public class JobVertex implements java.io.Serializable {
 	private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
 
 	/** Number of subtasks to split this task into at runtime.*/
-	private int parallelism = -1;
+	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	/** Custom configuration passed to the assigned task at runtime. */
 	private Configuration configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 533f1e1..430760b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -129,7 +130,7 @@ public class StreamNode implements Serializable {
 	}
 
 	public int getParallelism() {
-		if (parallelism == -1) {
+		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
 			return env.getParallelism();
 		} else {
 			return parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
index 5d3878c..3cdb88c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase
 import org.junit.Test
 
 import org.apache.flink.api.scala._
+import org.apache.flink.api.common.ExecutionConfig
 
 class ReduceTranslationTest {
   @Test
@@ -52,7 +53,8 @@ class ReduceTranslationTest {
       assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getInputType)
       assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getOutputType)
       assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0)
-      assertTrue(reducer.getParallelism == 1 || reducer.getParallelism == -1)
+      assertTrue(reducer.getParallelism == 1 ||
+        reducer.getParallelism == ExecutionConfig.PARALLELISM_DEFAULT)
       assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
     }
     catch {
@@ -81,7 +83,8 @@ class ReduceTranslationTest {
       val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_,
_]]
       assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getInputType)
       assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getOutputType)
-      assertTrue(reducer.getParallelism == parallelism || reducer.getParallelism == -1)
+      assertTrue(reducer.getParallelism == parallelism ||
+        reducer.getParallelism == ExecutionConfig.PARALLELISM_DEFAULT)
       assertArrayEquals(Array[Int](2), reducer.getKeyColumns(0))
       assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
     }


Mime
View raw message