flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] flink git commit: Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig"
Date Tue, 04 Apr 2017 11:56:47 GMT
Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig"

This reverts commit d3b275f4b7d49b67013e26d1f29a065d3131c664.

This fix was causing more problems than it was solving.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd98e8b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd98e8b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd98e8b8

Branch: refs/heads/release-1.2
Commit: fd98e8b8c059b54f82d80d251b72b80459f8fee5
Parents: 04a1d6b
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Apr 3 18:40:05 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Apr 4 13:42:17 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/ExecutionConfig.java    |  9 ---------
 .../runtime/executiongraph/ExecutionJobVertex.java  | 11 +++--------
 .../runtime/executiongraph/ExecutionVertex.java     |  4 ++--
 .../runtime/state/KeyGroupRangeAssignment.java      | 16 ++++++++++++----
 .../api/environment/StreamExecutionEnvironment.java |  5 +++--
 .../streaming/api/graph/StreamGraphGenerator.java   |  6 +++---
 6 files changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 14245ed..32ea0a3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -83,15 +83,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 */
 	public static final int PARALLELISM_UNKNOWN = -2;
 
-	/**
-	 * The default lower bound for max parallelism if nothing was configured by the user. We
have this so allow users
-	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
-	 */
-	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
-
-	/** The (inclusive) upper bound for max parallelism */
-	public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
-
 	private static final long DEFAULT_RESTART_DELAY = 10000L;
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 59f9986..e8664f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -222,14 +221,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex,
Archiveable
 	}
 
 	private void setMaxParallelismInternal(int maxParallelism) {
-		if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-			maxParallelism = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
-		}
-
 		Preconditions.checkArgument(maxParallelism > 0
-						&& maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
-				"Overriding max parallelism is not in valid bounds (1..%s), found: %s",
-				ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
+						&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+				"Overriding max parallelism is not in valid bounds (1.." +
+						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism);
 
 		this.maxParallelism = maxParallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index cde1f6c..09497e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
@@ -41,6 +40,7 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -609,7 +609,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				//TODO this case only exists for test, currently there has to be exactly one consumer
in real jobs!
 				producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
 						partition,
-						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
 						lazyScheduling));
 			} else {
 				Preconditions.checkState(1 == consumers.size(),

http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index bf0611b..62bf3f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -18,12 +18,20 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 
 public final class KeyGroupRangeAssignment {
 
+	/**
+	 * The default lower bound for max parallelism if nothing was configured by the user. We
have this so allow users
+	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+	 */
+	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+	/** The (inclusive) upper bound for max parallelism */
+	public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
+
 	private KeyGroupRangeAssignment() {
 		throw new AssertionError();
 	}
@@ -122,13 +130,13 @@ public final class KeyGroupRangeAssignment {
 		return Math.min(
 				Math.max(
 						MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
-						ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
-				ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM);
+						DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+				UPPER_BOUND_MAX_PARALLELISM);
 	}
 
 	public static void checkParallelismPreconditions(int parallelism) {
 		Preconditions.checkArgument(parallelism > 0
-						&& parallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+						&& parallelism <= UPPER_BOUND_MAX_PARALLELISM,
 				"Operator parallelism not within bounds: " + parallelism);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 640915c..6ac3622 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -48,6 +48,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -184,9 +185,9 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
 		Preconditions.checkArgument(maxParallelism > 0 &&
-						maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+						maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
 				"maxParallelism is out of bounds 0 < maxParallelism <= " +
-						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
 
 		config.setMaxParallelism(maxParallelism);
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd98e8b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index e796629..333e4f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -78,8 +78,8 @@ public class StreamGraphGenerator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
 
-	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
-	public static final int UPPER_BOUND_MAX_PARALLELISM = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
+	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+	public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
 
 	// The StreamGraph that is being built, this is initialized at the beginning.
 	private StreamGraph streamGraph;


Mime
View raw message