Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C4282200CB2 for ; Sun, 25 Jun 2017 13:50:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C3532160BD8; Sun, 25 Jun 2017 11:50:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E60EA160BFC for ; Sun, 25 Jun 2017 13:50:26 +0200 (CEST) Received: (qmail 56841 invoked by uid 500); 25 Jun 2017 11:50:26 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 56559 invoked by uid 99); 25 Jun 2017 11:50:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Jun 2017 11:50:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F641E96B4; Sun, 25 Jun 2017 11:50:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Date: Sun, 25 Jun 2017 11:50:26 -0000 Message-Id: <82a3e3af1d434e0a9cbcaaa0a36aaf56@git.apache.org> In-Reply-To: <0c5dceb3951e4e3887413a7d4637640f@git.apache.org> References: <0c5dceb3951e4e3887413a7d4637640f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/10] flink git commit: [FLINK-6742] Improve savepoint migration failure error message archived-at: Sun, 25 Jun 2017 11:50:27 -0000 [FLINK-6742] Improve savepoint migration failure error message Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bbfe029 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bbfe029 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bbfe029 Branch: refs/heads/release-1.3 Commit: 2bbfe0292c13d875b531a6c168ea78bfc7f21f0b Parents: 3d5cee2 Author: zentol Authored: Wed Jun 7 12:03:21 2017 +0200 Committer: zentol Committed: Sun Jun 25 08:55:09 2017 +0200 ---------------------------------------------------------------------- .../checkpoint/savepoint/SavepointV2.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2bbfe029/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java index 1b2963d..5e46f93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -168,10 +168,27 @@ public class SavepointV2 implements Savepoint { expandedToLegacyIds = true; } + if (jobVertex == null) { + throw new IllegalStateException( + "Could not find task for state with ID " + taskState.getJobVertexID() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + + "changed through removal of a stateful operator or modification of a chain containing a stateful " + + "operator."); + } + List operatorIDs = jobVertex.getOperatorIDs(); for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) { - SubtaskState subtaskState = taskState.getState(subtaskIndex); + SubtaskState subtaskState; + try { + subtaskState = taskState.getState(subtaskIndex); + } catch (Exception e) { + throw new IllegalStateException( + "Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that no changes were made " + + "to the parallelism of stateful operators.", + e); + } if (subtaskState == null) { continue;