flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-3020][streaming] set number of task slots to maximum parallelism in local execution
Date Mon, 23 Nov 2015 17:34:49 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10 968d8b496 -> f963425ea


[FLINK-3020][streaming] set number of task slots to maximum parallelism in local execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f963425e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f963425e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f963425e

Branch: refs/heads/release-0.10
Commit: f963425eac7b80cb35cfbf13d0ccfd98e0c5ce29
Parents: 968d8b4
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Nov 17 15:04:31 2015 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Nov 23 18:34:35 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobgraph/JobGraph.java     | 12 ++++++++++++
 .../api/environment/LocalStreamEnvironment.java         |  2 +-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f963425e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index a64d63c..566e44f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -539,6 +539,18 @@ public class JobGraph implements Serializable {
 		}
 	}
 
+	/**
+	 * Gets the maximum parallelism of all operations in this job graph.
+	 * @return The maximum parallelism of this job graph
+	 */
+	public int getMaximumParallelism() {
+		int maxParallelism = -1;
+		for (JobVertex vertex : taskVertices.values()) {
+			maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
+		}
+		return maxParallelism;
+	}
+
 	@Override
 	public String toString() {
 		return "JobGraph(jobId: " + jobID + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/f963425e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 35b3e59..a2cfbc0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -87,7 +87,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		configuration.addAll(jobGraph.getJobConfiguration());
 
 		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 		
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);


Mime
View raw message