flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [14/21] flink git commit: [FLINK-6682] [checkpoints] Improve error message in case parallelism exceeds maxParallelism
Date Sun, 25 Jun 2017 06:44:57 GMT
[FLINK-6682] [checkpoints] Improve error message in case parallelism exceeds maxParallelism

This closes #4125.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c736ba2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c736ba2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c736ba2e

Branch: refs/heads/master
Commit: c736ba2ef15e9e81a54a3fc02ccffadcbf594767
Parents: 7216407
Author: zhangminglei <zml13856086071@163.com>
Authored: Tue Jun 20 19:43:44 2017 +0800
Committer: zentol <chesnay@apache.org>
Committed: Fri Jun 23 14:14:30 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/StateAssignmentOperation.java   | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c736ba2e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 1042d5a..5712ea1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -464,6 +464,14 @@ public class StateAssignmentOperation {
 	private static void checkParallelismPreconditions(OperatorState operatorState, ExecutionJobVertex
executionJobVertex) {
 		//----------------------------------------max parallelism preconditions-------------------------------------
 
+		if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
+			throw new IllegalStateException("The state for task " + executionJobVertex.getJobVertexId()
+
+				" can not be restored. The maximum parallelism (" + operatorState.getMaxParallelism()
+
+				") of the restored state is lower than the configured parallelism (" + executionJobVertex.getParallelism()
+
+				"). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism."
+			);
+		}
+
 		// check that the number of key groups have not changed or if we need to override it to
satisfy the restored state
 		if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
 


Mime
View raw message