Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D93CB200D50 for ; Mon, 4 Dec 2017 12:42:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D79DF160C1B; Mon, 4 Dec 2017 11:42:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CEF85160BF9 for ; Mon, 4 Dec 2017 12:42:52 +0100 (CET) Received: (qmail 23153 invoked by uid 500); 4 Dec 2017 11:42:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 23144 invoked by uid 99); 4 Dec 2017 11:42:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Dec 2017 11:42:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E507DDFF89; Mon, 4 Dec 2017 11:42:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Message-Id: <97f66488f4a14044920962b4a20210ea@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-7595] [Savepoints] Allow removing stateless operators Date: Mon, 4 Dec 2017 11:42:51 +0000 (UTC) archived-at: Mon, 04 Dec 2017 11:42:54 -0000 Repository: flink Updated Branches: refs/heads/master f4e4cd6cb -> 80348d653 [FLINK-7595] [Savepoints] Allow removing stateless operators This closes #4651. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80348d65 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80348d65 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80348d65 Branch: refs/heads/master Commit: 80348d653b48e1b7d6a0b9275dbfa510eaea151f Parents: f4e4cd6 Author: zentol Authored: Wed Sep 6 15:38:20 2017 +0200 Committer: zentol Committed: Mon Dec 4 12:42:24 2017 +0100 ---------------------------------------------------------------------- .../checkpoint/savepoint/SavepointLoader.java | 12 +++- .../savepoint/SavepointLoaderTest.java | 16 ++++-- .../AbstractOperatorRestoreTestBase.java | 11 +++- ...AbstractNonKeyedOperatorRestoreTestBase.java | 7 ++- .../ChainLengthStatelessDecreaseTest.java | 59 ++++++++++++++++++++ 5 files changed, 96 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index 38db7c2..31d9124 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -120,14 +121,19 @@ public class SavepointLoader { } else if (allowNonRestoredState) { LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID()); } else { - String msg = String.format("Failed to rollback to savepoint %s. " + + for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) { + if (operatorSubtaskState.hasState()) { + String msg = String.format("Failed to rollback to savepoint %s. " + "Cannot map savepoint state for operator %s to the new program, " + "because the operator is not available in the new program. If " + "you want to allow to skip this, you can set the --allowNonRestoredState " + "option on the CLI.", - savepointPath, operatorState.getOperatorID()); + savepointPath, operatorState.getOperatorID()); - throw new IllegalStateException(msg); + throw new IllegalStateException(msg); + } + } + LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java index 331621d..a461569 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -22,9 +22,13 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -59,10 +63,14 @@ public class SavepointLoaderTest { JobVertexID jobVertexID = new JobVertexID(); OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID); - OperatorState state = mock(OperatorState.class); - when(state.getParallelism()).thenReturn(parallelism); - when(state.getOperatorID()).thenReturn(operatorID); - when(state.getMaxParallelism()).thenReturn(parallelism); + OperatorSubtaskState subtaskState = new OperatorSubtaskState( + new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])), + null, + null, + null); + + OperatorState state = new OperatorState(operatorID, parallelism, parallelism); + state.putState(0, subtaskState); Map taskStates = new HashMap<>(); taskStates.put(operatorID, state); http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 7488b62..c86f21f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -89,6 +89,15 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { private static ActorGateway taskManager = null; private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS); + private final boolean allowNonRestoredState; + + protected AbstractOperatorRestoreTestBase() { + this(true); + } + + protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) { + this.allowNonRestoredState = allowNonRestoredState; + } @BeforeClass public static void beforeClass() { @@ -238,7 +247,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { private void restoreJob(String savepointPath) throws Exception { JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE); - jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true)); + jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState)); Object msg; Object result; http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java index f07bd4d..c100dc9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java @@ -51,7 +51,12 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp "nonKeyed-flink1.3"); } - public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) { + protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) { + this.savepointPath = savepointPath; + } + + protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, boolean allowNonRestoredState) { + super(allowNonRestoredState); this.savepointPath = savepointPath; } http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java new file mode 100644 index 0000000..fb4dcf5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * Verifies that the state of all operators is restored if a topology change removes an operator from a chain. + * + *

This test specifically checks that stateless operators can be removed even if all states from the previous job + * must be restored. + */ +public class ChainLengthStatelessDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase { + + public ChainLengthStatelessDecreaseTest(String savepointPath) { + super(savepointPath, false); + } + + @Override + public void createRestoredJob(StreamExecutionEnvironment env) { + /* + * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> StatefulMap3) + */ + DataStream source = createSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator first = createFirstStatefulMap(ExecutionMode.RESTORE, source); + first.startNewChain(); + + SingleOutputStreamOperator second = createSecondStatefulMap(ExecutionMode.RESTORE, first); + second.startNewChain(); + + SingleOutputStreamOperator third = createThirdStatefulMap(ExecutionMode.RESTORE, second); + } +}