flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [05/10] flink git commit: [FLINK-6742] Improve savepoint migration failure error message
Date Sun, 25 Jun 2017 11:50:26 GMT
[FLINK-6742] Improve savepoint migration failure error message


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bbfe029
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bbfe029
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bbfe029

Branch: refs/heads/release-1.3
Commit: 2bbfe0292c13d875b531a6c168ea78bfc7f21f0b
Parents: 3d5cee2
Author: zentol <chesnay@apache.org>
Authored: Wed Jun 7 12:03:21 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Sun Jun 25 08:55:09 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/savepoint/SavepointV2.java        | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bbfe029/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 1b2963d..5e46f93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -168,10 +168,27 @@ public class SavepointV2 implements Savepoint {
 				expandedToLegacyIds = true;
 			}
 
+			if (jobVertex == null) {
+				throw new IllegalStateException(
+					"Could not find task for state with ID " + taskState.getJobVertexID() + ". " +
+					"When migrating a savepoint from a version < 1.3 please make sure that the topology
was not " +
+					"changed through removal of a stateful operator or modification of a chain containing
a stateful " +
+					"operator.");
+			}
+
 			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
 
 			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++)
{
-				SubtaskState subtaskState = taskState.getState(subtaskIndex);
+				SubtaskState subtaskState;
+				try {
+					subtaskState = taskState.getState(subtaskIndex);
+				} catch (Exception e) {
+					throw new IllegalStateException(
+						"Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId()
+ ". " +
+						"When migrating a savepoint from a version < 1.3 please make sure that no changes
were made " +
+						"to the parallelism of stateful operators.",
+						e);
+				}
 
 				if (subtaskState == null) {
 					continue;


Mime
View raw message