flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/9] flink git commit: [FLINK-1679] deprecate API methods to set the parallelism
Date Mon, 23 Mar 2015 08:09:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master 35f34162a -> 126f9f799


[FLINK-1679] deprecate API methods to set the parallelism


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d994d2e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d994d2e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d994d2e6

Branch: refs/heads/master
Commit: d994d2e6a0c37ed3156fdd670aca1bebb839983d
Parents: 35f3416
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Mar 18 10:44:42 2015 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Mar 23 08:56:18 2015 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 39 ++++++++++++++-
 .../flink/api/common/operators/Operator.java    | 41 ++++++++++++---
 .../flink/api/java/CollectionEnvironment.java   | 11 ++++-
 .../flink/api/java/ExecutionEnvironment.java    | 43 ++++++++++++++--
 .../flink/api/scala/ExecutionEnvironment.scala  | 26 ++++++++--
 .../spargel/java/record/SpargelIteration.java   | 14 ++++--
 .../environment/StreamExecutionEnvironment.java | 52 ++++++++++++++++----
 .../api/scala/StreamExecutionEnvironment.scala  | 26 ++++++++--
 8 files changed, 217 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index aa025c1..a2df438 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -137,8 +137,25 @@ public class ExecutionConfig implements Serializable {
 	 *
 	 * @return The parallelism used by operations, unless they override that value. This method
 	 *         returns {@code -1}, if the environment's default parallelism should be used.
+	 * @deprecated Please use {@link #getParallelism}
 	 */
+	@Deprecated
 	public int getDegreeOfParallelism() {
+		return getParallelism();
+	}
+
+	/**
+	 * Gets the parallelism with which operation are executed by default. Operations can
+	 * individually override this value to use a specific parallelism.
+	 *
+	 * Other operations may need to run with a different parallelism - for example calling
+	 * a reduce operation over the entire data set will involve an operation that runs
+	 * with a parallelism of one (the final reduce to the single result value).
+	 *
+	 * @return The parallelism used by operations, unless they override that value. This method
+	 *         returns {@code -1}, if the environment's default parallelism should be used.
+	 */
+	public int getParallelism() {
 		return parallelism;
 	}
 
@@ -150,14 +167,32 @@ public class ExecutionConfig implements Serializable {
 	 * This method overrides the default parallelism for this environment.
 	 * The local execution environment uses by default a value equal to the number of hardware
 	 * contexts (CPU cores / threads). When executing the program via the command line client
-	 * from a JAR file, the default degree of parallelism is the one configured for that setup.
+	 * from a JAR file, the default parallelism is the one configured for that setup.
 	 *
 	 * @param parallelism The parallelism to use
+	 * @deprecated Please use {@link #setParallelism}
 	 */
+	@Deprecated
 	public ExecutionConfig setDegreeOfParallelism(int parallelism) {
+		return setParallelism(parallelism);
+	}
+
+	/**
+	 * Sets the parallelism for operations executed through this environment.
+	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce)
to run with
+	 * x parallel instances.
+	 * <p>
+	 * This method overrides the default parallelism for this environment.
+	 * The local execution environment uses by default a value equal to the number of hardware
+	 * contexts (CPU cores / threads). When executing the program via the command line client
+	 * from a JAR file, the default parallelism is the one configured for that setup.
+	 *
+	 * @param parallelism The parallelism to use
+	 */
+	public ExecutionConfig setParallelism(int parallelism) {
 		if (parallelism < 1 && parallelism != -1) {
 			throw new IllegalArgumentException(
-					"Degree of parallelism must be at least one, or -1 (use system default).");
+					"Parallelism must be at least one, or -1 (use system default).");
 		}
 		this.parallelism = parallelism;
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 85b352a..840c253 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -40,7 +40,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>>
{
 	
 	protected String name;								// the name of the contract instance. optional.
 		
-	private int degreeOfParallelism = -1;				// the number of parallel instances to use. -1,
if unknown
+	private int parallelism = -1;				// the number of parallel instances to use. -1, if unknown
 
 	/**
 	 * The return type of the user function.
@@ -160,25 +160,50 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>>
{
 	}
 
 	/**
-	 * Gets the degree of parallelism for this contract instance. The degree of parallelism
denotes
+	 * Gets the parallelism for this contract instance. The parallelism denotes
 	 * how many parallel instances of the user function will be spawned during the execution.
If this
 	 * value is <code>-1</code>, then the system will decide the number of parallel
instances by itself.
 	 * 
-	 * @return The degree of parallelism.
+	 * @return The parallelism.
+	 * @deprecated Please use {@link #getParallelism}
 	 */
+	@Deprecated
 	public int getDegreeOfParallelism() {
-		return this.degreeOfParallelism;
+		return getParallelism();
 	}
 
 	/**
-	 * Sets the degree of parallelism for this contract instance. The degree of parallelism
denotes
+	 * Gets the parallelism for this contract instance. The parallelism denotes
+	 * how many parallel instances of the user function will be spawned during the execution.
If this
+	 * value is <code>-1</code>, then the system will decide the number of parallel
instances by itself.
+	 *
+	 * @return The parallelism.
+	 */
+	public int getParallelism() {
+		return this.parallelism;
+	}
+
+	/**
+	 * Sets the parallelism for this contract instance. The parallelism denotes
 	 * how many parallel instances of the user function will be spawned during the execution.
Set this
 	 * value to <code>-1</code> to let the system decide on its own.
 	 * 
-	 * @param degree The number of parallel instances to spawn. -1, if unspecified.
+	 * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
+	 * @deprecated Please use {@link #setParallelism}
+	 */
+	@Deprecated
+	public void setDegreeOfParallelism(int parallelism) {
+		setParallelism(parallelism);
+	}
+	/**
+	 * Sets the parallelism for this contract instance. The parallelism denotes
+	 * how many parallel instances of the user function will be spawned during the execution.
Set this
+	 * value to <code>-1</code> to let the system decide on its own.
+	 *
+	 * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
 	 */
-	public void setDegreeOfParallelism(int degree) {
-		this.degreeOfParallelism = degree;
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index c61d624..b48debc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -32,9 +32,18 @@ public class CollectionEnvironment extends ExecutionEnvironment {
 		CollectionExecutor exec = new CollectionExecutor(getConfig());
 		return exec.execute(p);
 	}
-	
+
+	/**
+	 * @deprecated Please use {@link #getParallelism}
+	 */
 	@Override
+	@Deprecated
 	public int getDegreeOfParallelism() {
+		return getParallelism();
+	}
+
+	@Override
+	public int getParallelism() {
 		return 1; // always serial
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 929bf6e..aac3147 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -142,9 +142,26 @@ public abstract class ExecutionEnvironment {
 	 * 
 	 * @return The degree of parallelism used by operations, unless they override that value.
This method
 	 *         returns {@code -1}, if the environments default parallelism should be used.
+	 * @deprecated Please use {@link #getParallelism}
 	 */
+	@Deprecated
 	public int getDegreeOfParallelism() {
-		return config.getDegreeOfParallelism();
+		return getParallelism();
+	}
+
+	/**
+	 * Gets the parallelism with which operation are executed by default. Operations can
+	 * individually override this value to use a specific parallelism via
+	 * {@link Operator#setParallelism(int)}. Other operations may need to run with a different
+	 * parallelism - for example calling
+	 * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the
entire
+	 * set will insert eventually an operation that runs non-parallel (parallelism of one).
+	 *
+	 * @return The parallelism used by operations, unless they override that value. This method
+	 *         returns {@code -1}, if the environments default parallelism should be used.
+	 */
+	public int getParallelism() {
+		return config.getParallelism();
 	}
 	
 	/**
@@ -157,10 +174,28 @@ public abstract class ExecutionEnvironment {
 	 * contexts (CPU cores / threads). When executing the program via the command line client

 	 * from a JAR file, the default degree of parallelism is the one configured for that setup.
 	 * 
-	 * @param degreeOfParallelism The degree of parallelism
+	 * @param parallelism The parallelism
+	 * @deprecated Please use {@link #setParallelism}
+	 */
+	@Deprecated
+	public void setDegreeOfParallelism(int parallelism) {
+		setParallelism(parallelism);
+	}
+
+	/**
+	 * Sets the parallelism for operations executed through this environment.
+	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce)
to run with
+	 * x parallel instances.
+	 * <p>
+	 * This method overrides the default parallelism for this environment.
+	 * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
+	 * contexts (CPU cores / threads). When executing the program via the command line client
+	 * from a JAR file, the default parallelism is the one configured for that setup.
+	 *
+	 * @param parallelism The parallelism
 	 */
-	public void setDegreeOfParallelism(int degreeOfParallelism) {
-		config.setDegreeOfParallelism(degreeOfParallelism);
+	public void setParallelism(int parallelism) {
+		config.setParallelism(parallelism);
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index f49aef1..cccea78 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -82,17 +82,35 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run
with
    * x parallel instances. This value can be overridden by specific operations using
    * [[DataSet.setParallelism]].
+   * @deprecated Please use [[setParallelism]]
    */
-  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+  @deprecated
+  def setDegreeOfParallelism(parallelism: Int): Unit = {
+    setParallelism(parallelism)
   }
 
   /**
    * Returns the default degree of parallelism for this execution environment. Note that
this
    * value can be overridden by individual operations using [[DataSet.setParallelism]
    */
-  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
-  
+  def setParallelism(parallelism: Int): Unit = {
+    javaEnv.setParallelism(parallelism)
+  }
+
+  /**
+   * Returns the default parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataSet.setParallelism]]
+   * @deprecated Please use [[getParallelism]]
+   */
+  @deprecated
+  def getDegreeOfParallelism = javaEnv.getParallelism
+
+  /**
+   * Returns the default parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataSet.setParallelism]]
+   */
+  def getParallelism = javaEnv.getParallelism
+
   /**
    * Sets the number of times that failed tasks are re-executed. A value of zero
    * effectively disables fault tolerance. A value of "-1" indicates that the system

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index f647e5d..8f1839d 100644
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -127,9 +127,17 @@ public class SpargelIteration {
 	public Operator<?> getOutput() {
 		return this.iteration;
 	}
-	
-	public void setDegreeOfParallelism(int dop) {
-		this.iteration.setDegreeOfParallelism(dop);
+
+	/**
+	 * @deprecated Please use {@link #setParallelism}
+	 */
+	@Deprecated
+	public void setDegreeOfParallelism(int parallelism) {
+		setParallelism(parallelism);
+	}
+
+	public void setParallelism(int parallelism) {
+		this.iteration.setParallelism(parallelism);
 	}
 	
 	public void setNumberOfIterations(int iterations) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 994ff15..dc1826b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -97,16 +97,30 @@ public abstract class StreamExecutionEnvironment {
 	 * default. Operations can individually override this value to use a
 	 * specific degree of parallelism.
 	 * 
-	 * @return The degree of parallelism used by operations, unless they
+	 * @return The parallelism used by operations, unless they
 	 *         override that value.
+	 * @deprecated Please use {@link #getParallelism}
 	 */
+	@Deprecated
 	public int getDegreeOfParallelism() {
-		return config.getDegreeOfParallelism();
+		return getParallelism();
 	}
 
 	/**
-	 * Sets the degree of parallelism (DOP) for operations executed through this
-	 * environment. Setting a DOP of x here will cause all operators (such as
+	 * Gets the parallelism with which operation are executed by
+	 * default. Operations can individually override this value to use a
+	 * specific parallelism.
+	 *
+	 * @return The parallelism used by operations, unless they
+	 *         override that value.
+	 */
+	public int getParallelism() {
+		return config.getParallelism();
+	}
+
+	/**
+	 * Sets the parallelism for operations executed through this
+	 * environment. Setting a parallelism of x here will cause all operators (such as
 	 * map, batchReduce) to run with x parallel instances. This method overrides
 	 * the default parallelism for this environment. The
 	 * {@link LocalStreamEnvironment} uses by default a value equal to the
@@ -114,14 +128,32 @@ public abstract class StreamExecutionEnvironment {
 	 * program via the command line client from a JAR file, the default degree
 	 * of parallelism is the one configured for that setup.
 	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism
+	 * @param parallelism The parallelism
+	 * @deprecated Please use {@link #setParallelism}
+	 */
+	@Deprecated
+	public StreamExecutionEnvironment setDegreeOfParallelism(int parallelism) {
+		return setParallelism(parallelism);
+	}
+
+	/**
+	 * Sets the parallelism for operations executed through this
+	 * environment. Setting a parallelism of x here will cause all operators (such as
+	 * map, batchReduce) to run with x parallel instances. This method overrides
+	 * the default parallelism for this environment. The
+	 * {@link LocalStreamEnvironment} uses by default a value equal to the
+	 * number of hardware contexts (CPU cores / threads). When executing the
+	 * program via the command line client from a JAR file, the default degree
+	 * of parallelism is the one configured for that setup.
+	 *
+	 * @param parallelism
+	 *            The parallelism
 	 */
-	public StreamExecutionEnvironment setDegreeOfParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+	public StreamExecutionEnvironment setParallelism(int parallelism) {
+		if (parallelism < 1) {
+			throw new IllegalArgumentException("parallelism must be at least one.");
 		}
-		config.setDegreeOfParallelism(degreeOfParallelism);
+		config.setParallelism(parallelism);
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d994d2e6/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2208388..598b590 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -37,16 +37,36 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run
with
    * x parallel instances. This value can be overridden by specific operations using
    * [[DataStream.setParallelism]].
+   * @deprecated Please use [[setParallelism]]
    */
+  @deprecated
   def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    javaEnv.setParallelism(degreeOfParallelism)
+  }
+
+  /**
+   * Sets the parallelism for operations executed through this environment.
+   * Setting a parallelism of x here will cause all operators (such as join, map, reduce)
to run
+   * with x parallel instances. This value can be overridden by specific operations using
+   * [[DataStream.setParallelism]].
+   */
+  def setParallelism(parallelism: Int): Unit = {
+    javaEnv.setParallelism(parallelism)
   }
 
   /**
-   * Returns the default degree of parallelism for this execution environment. Note that
this
+   * Returns the default parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   * @deprecated Please use [[getParallelism]]
+   */
+  @deprecated
+  def getDegreeOfParallelism = javaEnv.getParallelism
+
+  /**
+   * Returns the default parallelism for this execution environment. Note that this
    * value can be overridden by individual operations using [[DataStream.setParallelism]]
    */
-  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+  def getParallelism = javaEnv.getParallelism
 
   /**
    * Sets the maximum time frequency (milliseconds) for the flushing of the


Mime
View raw message