From commits-return-43043-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Sep 1 06:30:00 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id CE38E180634 for ; Wed, 1 Sep 2021 08:30:00 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 7B0876289C for ; Wed, 1 Sep 2021 06:29:27 +0000 (UTC) Received: (qmail 68492 invoked by uid 500); 1 Sep 2021 06:29:26 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 68475 invoked by uid 99); 1 Sep 2021 06:29:26 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Sep 2021 06:29:26 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9354381EFC; Wed, 1 Sep 2021 06:29:26 +0000 (UTC) Date: Wed, 01 Sep 2021 06:29:23 +0000 To: "commits@flink.apache.org" Subject: [flink] 01/11: [FLINK-23854][datastream] Expose the restored checkpoint id in ManagedInitializationContext. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: arvid@apache.org In-Reply-To: <163047776084.23457.752556586781185374@gitbox.apache.org> References: <163047776084.23457.752556586781185374@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: flink X-Git-Refname: refs/heads/release-1.14 X-Git-Reftype: branch X-Git-Rev: 3076a23b1c7b14307160eba31af40bb4f4d72863 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210901062926.9354381EFC@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git commit 3076a23b1c7b14307160eba31af40bb4f4d72863 Author: Arvid Heise AuthorDate: Tue Aug 24 16:24:01 2021 +0200 [FLINK-23854][datastream] Expose the restored checkpoint id in ManagedInitializationContext. --- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 2 +- .../kafka/FlinkKafkaConsumerBaseTest.java | 6 +++ .../PrioritizedOperatorSubtaskState.java | 44 ++++++++++++++++------ .../checkpoint/StateAssignmentOperation.java | 12 ++---- .../flink/runtime/executiongraph/Execution.java | 2 +- .../runtime/executiongraph/ExecutionVertex.java | 4 +- .../state/ManagedInitializationContext.java | 13 +++++-- .../state/StateInitializationContextImpl.java | 19 ++++++++-- .../flink/runtime/state/TaskStateManagerImpl.java | 7 +++- .../runtime/state/TaskStateManagerImplTest.java | 5 +-- .../flink/runtime/state/TestTaskStateManager.java | 3 +- .../api/operators/StreamOperatorStateContext.java | 14 ++++++- .../api/operators/StreamOperatorStateHandler.java | 5 ++- .../operators/StreamTaskStateInitializerImpl.java | 15 +++++--- .../api/operators/SourceOperatorTest.java | 2 +- .../StateInitializationContextImplTest.java | 4 +- .../StreamTaskStateInitializerImplTest.java | 4 +- .../utils/MockFunctionInitializationContext.java | 7 ++++ .../source/SourceOperatorEventTimeTest.java | 2 +- .../runtime/tasks/RestoreStreamTaskTest.java | 41 ++++++++++++++++---- .../streaming/runtime/tasks/StreamTaskTest.java | 6 +++ 21 files changed, 158 insertions(+), 59 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java index 17142b2..5b5aa02 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java @@ -369,7 +369,7 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase { static StateInitializationContextImpl buildInitCtx(boolean restored) { return new StateInitializationContextImpl( - restored, + restored ? 1L : null, new DefaultOperatorStateBackend( new ExecutionConfig(), new CloseableRegistry(), diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index d87f73a..add4d2f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -84,6 +84,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -1520,6 +1521,11 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { } @Override + public OptionalLong getRestoredCheckpointId() { + return isRestored ? OptionalLong.of(1L) : OptionalLong.empty(); + } + + @Override public OperatorStateStore getOperatorStateStore() { return operatorStateStore; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java index fc4f007..ef9bcd0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.StateObject; import org.apache.commons.lang3.BooleanUtils; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; @@ -47,10 +48,13 @@ import java.util.function.Function; @Internal public class PrioritizedOperatorSubtaskState { + private static final OperatorSubtaskState EMPTY_JM_STATE_STATE = + OperatorSubtaskState.builder().build(); + /** Singleton instance for an empty, non-restored operator state. */ private static final PrioritizedOperatorSubtaskState EMPTY_NON_RESTORED_INSTANCE = new PrioritizedOperatorSubtaskState.Builder( - OperatorSubtaskState.builder().build(), Collections.emptyList(), false) + EMPTY_JM_STATE_STATE, Collections.emptyList(), null) .build(); /** List of prioritized snapshot alternatives for managed operator state. */ @@ -70,8 +74,8 @@ public class PrioritizedOperatorSubtaskState { private final List> prioritizedResultSubpartitionState; - /** Signal flag if this represents state for a restored operator. */ - private final boolean restored; + /** Checkpoint id for a restored operator or null if not restored. */ + private final @Nullable Long restoredCheckpointId; PrioritizedOperatorSubtaskState( @Nonnull List> prioritizedManagedKeyedState, @@ -86,7 +90,7 @@ public class PrioritizedOperatorSubtaskState { @Nonnull List> prioritizedResultSubpartitionState, - boolean restored) { + @Nullable Long restoredCheckpointId) { this.prioritizedManagedOperatorState = prioritizedManagedOperatorState; this.prioritizedRawOperatorState = prioritizedRawOperatorState; @@ -94,7 +98,7 @@ public class PrioritizedOperatorSubtaskState { this.prioritizedRawKeyedState = prioritizedRawKeyedState; this.prioritizedInputChannelState = prioritizedInputChannelState; this.prioritizedResultSubpartitionState = prioritizedResultSubpartitionState; - this.restored = restored; + this.restoredCheckpointId = restoredCheckpointId; } // ----------------------------------------------------------------------------------------------------------------- @@ -192,7 +196,17 @@ public class PrioritizedOperatorSubtaskState { * snapshots. */ public boolean isRestored() { - return restored; + return restoredCheckpointId != null; + } + + /** + * Returns the checkpoint id if this was created for a restored operator, null otherwise. + * Restored operators are operators that participated in a previous checkpoint, even if they did + * not emit any state snapshots. + */ + @Nullable + public Long getRestoredCheckpointId() { + return restoredCheckpointId; } private static StateObjectCollection lastElement( @@ -208,6 +222,12 @@ public class PrioritizedOperatorSubtaskState { return EMPTY_NON_RESTORED_INSTANCE; } + public static PrioritizedOperatorSubtaskState empty(long restoredCheckpointId) { + return new PrioritizedOperatorSubtaskState.Builder( + EMPTY_JM_STATE_STATE, Collections.emptyList(), restoredCheckpointId) + .build(); + } + /** A builder for PrioritizedOperatorSubtaskState. */ @Internal public static class Builder { @@ -218,23 +238,23 @@ public class PrioritizedOperatorSubtaskState { /** (Local) alternatives to the job manager state. */ @Nonnull private final List alternativesByPriority; - /** Flag if the states have been restored. */ - private final boolean restored; + /** Checkpoint id of the restored checkpoint or null if not restored. */ + private final @Nullable Long restoredCheckpointId; public Builder( @Nonnull OperatorSubtaskState jobManagerState, @Nonnull List alternativesByPriority) { - this(jobManagerState, alternativesByPriority, true); + this(jobManagerState, alternativesByPriority, null); } public Builder( @Nonnull OperatorSubtaskState jobManagerState, @Nonnull List alternativesByPriority, - boolean restored) { + @Nullable Long restoredCheckpointId) { this.jobManagerState = jobManagerState; this.alternativesByPriority = alternativesByPriority; - this.restored = restored; + this.restoredCheckpointId = restoredCheckpointId; } public PrioritizedOperatorSubtaskState build() { @@ -290,7 +310,7 @@ public class PrioritizedOperatorSubtaskState { jobManagerState.getResultSubpartitionState(), resultSubpartitionStateAlternatives, eqStateApprover(ResultSubpartitionStateHandle::getInfo)), - restored); + restoredCheckpointId); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index c409340..69fc65c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -239,7 +239,6 @@ public class StateAssignmentOperation { int subTaskIndex, Execution currentExecutionAttempt) { TaskStateSnapshot taskState = new TaskStateSnapshot(operatorIDs.size(), false); - boolean statelessTask = true; for (OperatorIDPair operatorID : operatorIDs) { OperatorInstanceID instanceID = @@ -247,18 +246,13 @@ public class StateAssignmentOperation { OperatorSubtaskState operatorSubtaskState = assignment.getSubtaskState(instanceID); - if (operatorSubtaskState.hasState()) { - statelessTask = false; - } taskState.putSubtaskStateByOperatorID( operatorID.getGeneratedOperatorID(), operatorSubtaskState); } - if (!statelessTask) { - JobManagerTaskRestore taskRestore = - new JobManagerTaskRestore(restoreCheckpointId, taskState); - currentExecutionAttempt.setInitialState(taskRestore); - } + JobManagerTaskRestore taskRestore = + new JobManagerTaskRestore(restoreCheckpointId, taskState); + currentExecutionAttempt.setInitialState(taskRestore); } public void checkParallelismPreconditions(TaskStateAssignment taskStateAssignment) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 79896b2..d29bfc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -356,7 +356,7 @@ public class Execution * * @param taskRestore information to restore the state */ - public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { + public void setInitialState(JobManagerTaskRestore taskRestore) { this.taskRestore = taskRestore; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 6502ad9..265101b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -342,7 +342,9 @@ public class ExecutionVertex * that the execution attempt will resume. */ public Optional getPreferredLocationBasedOnState() { - if (currentExecution.getTaskRestore() != null) { + // only restore to same execution if it has state + if (currentExecution.getTaskRestore() != null + && currentExecution.getTaskRestore().getTaskStateSnapshot().hasState()) { return Optional.ofNullable(getLatestPriorLocation()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java index e7465f5..75433a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; +import java.util.OptionalLong; + /** * This interface provides a context in which operators can initialize by registering to managed * state (i.e. state that is managed by state backends). @@ -33,11 +35,16 @@ import org.apache.flink.api.common.state.OperatorStateStore; */ public interface ManagedInitializationContext { + /** Returns true, if state was restored from the snapshot of a previous execution. */ + default boolean isRestored() { + return getRestoredCheckpointId().isPresent(); + } + /** - * Returns true, if state was restored from the snapshot of a previous execution. This returns - * always false for stateless tasks. + * Returns id of the restored checkpoint, if state was restored from the snapshot of a previous + * execution. */ - boolean isRestored(); + OptionalLong getRestoredCheckpointId(); /** Returns an interface that allows for registering operator state with the backend. */ OperatorStateStore getOperatorStateStore(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index f5710ae..7ff901a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -21,11 +21,15 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; +import javax.annotation.Nullable; + +import java.util.OptionalLong; + /** Default implementation of {@link StateInitializationContext}. */ public class StateInitializationContextImpl implements StateInitializationContext { /** Signal whether any state to restore was found */ - private final boolean restored; + private final @Nullable Long restoredCheckpointId; private final OperatorStateStore operatorStateStore; @@ -35,13 +39,13 @@ public class StateInitializationContextImpl implements StateInitializationContex private final Iterable rawOperatorStateInputs; public StateInitializationContextImpl( - boolean restored, + @Nullable Long restoredCheckpointId, OperatorStateStore operatorStateStore, KeyedStateStore keyedStateStore, Iterable rawKeyedStateInputs, Iterable rawOperatorStateInputs) { - this.restored = restored; + this.restoredCheckpointId = restoredCheckpointId; this.operatorStateStore = operatorStateStore; this.keyedStateStore = keyedStateStore; this.rawOperatorStateInputs = rawOperatorStateInputs; @@ -50,7 +54,14 @@ public class StateInitializationContextImpl implements StateInitializationContex @Override public boolean isRestored() { - return restored; + return restoredCheckpointId != null; + } + + @Override + public OptionalLong getRestoredCheckpointId() { + return restoredCheckpointId == null + ? OptionalLong.empty() + : OptionalLong.of(restoredCheckpointId); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index cf9fae6..02f21e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -176,7 +176,8 @@ public class TaskStateManagerImpl implements TaskStateManager { jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID); if (jobManagerSubtaskState == null) { - return PrioritizedOperatorSubtaskState.emptyNotRestored(); + return PrioritizedOperatorSubtaskState.empty( + jobManagerTaskRestore.getRestoreCheckpointId()); } long restoreCheckpointId = jobManagerTaskRestore.getRestoreCheckpointId(); @@ -208,7 +209,9 @@ public class TaskStateManagerImpl implements TaskStateManager { PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder( - jobManagerSubtaskState, alternativesByPriority, true); + jobManagerSubtaskState, + alternativesByPriority, + jobManagerTaskRestore.getRestoreCheckpointId()); return builder.build(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java index 2d2a349..223f673 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java @@ -158,9 +158,8 @@ public class TaskStateManagerImplTest extends TestLogger { Assert.assertTrue(prioritized_1.isRestored()); Assert.assertTrue(prioritized_2.isRestored()); - Assert.assertFalse(prioritized_3.isRestored()); - Assert.assertFalse( - taskStateManager.prioritizedOperatorState(new OperatorID()).isRestored()); + Assert.assertTrue(prioritized_3.isRestored()); + Assert.assertTrue(taskStateManager.prioritizedOperatorState(new OperatorID()).isRestored()); // checks for operator 1. Iterator> prioritizedManagedKeyedState_1 = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java index 75fb52f..a52b15a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java @@ -184,7 +184,8 @@ public class TestTaskStateManager implements TaskStateManager { } } PrioritizedOperatorSubtaskState.Builder builder = - new PrioritizedOperatorSubtaskState.Builder(jmOpState, tmStateCollection); + new PrioritizedOperatorSubtaskState.Builder( + jmOpState, tmStateCollection, reportedCheckpointId); return builder.build(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index 52a4d98..695129c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.util.CloseableIterable; +import java.util.OptionalLong; + /** * This interface represents a context from which a stream operator can initialize everything * connected to state such as e.g. backends, raw state, and timer service manager. @@ -31,9 +33,17 @@ import org.apache.flink.util.CloseableIterable; public interface StreamOperatorStateContext { /** - * Returns true, the states provided by this context are restored from a checkpoint/savepoint. + * Returns true if the states provided by this context are restored from a checkpoint/savepoint. + */ + default boolean isRestored() { + return getRestoredCheckpointId().isPresent(); + } + + /** + * Returns non-empty if the states provided by this context are restored from a + * checkpoint/savepoint. */ - boolean isRestored(); + OptionalLong getRestoredCheckpointId(); /** Returns the operator state backend for the stream operator. */ OperatorStateBackend operatorStateBackend(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index 70b73c7..7311a72 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -62,6 +62,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Optional; +import java.util.OptionalLong; import static org.apache.flink.util.Preconditions.checkState; @@ -106,10 +107,10 @@ public class StreamOperatorStateHandler { context.rawOperatorStateInputs(); try { + OptionalLong checkpointId = context.getRestoredCheckpointId(); StateInitializationContext initializationContext = new StateInitializationContextImpl( - context.isRestored(), // information whether we restore or start for - // the first time + checkpointId.isPresent() ? checkpointId.getAsLong() : null, operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateInputs, // access to keyed state stream diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 31e567f..3df0b50 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -63,6 +63,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.OptionalLong; import java.util.stream.StreamSupport; import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException; @@ -219,7 +220,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( - prioritizedOperatorSubtaskStates.isRestored(), + prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, timeServiceManager, @@ -635,7 +636,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize private static class StreamOperatorStateContextImpl implements StreamOperatorStateContext { - private final boolean restored; + private final @Nullable Long restoredCheckpointId; private final OperatorStateBackend operatorStateBackend; private final CheckpointableKeyedStateBackend keyedStateBackend; @@ -645,14 +646,14 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize private final CloseableIterable rawKeyedStateInputs; StreamOperatorStateContextImpl( - boolean restored, + @Nullable Long restoredCheckpointId, OperatorStateBackend operatorStateBackend, CheckpointableKeyedStateBackend keyedStateBackend, InternalTimeServiceManager internalTimeServiceManager, CloseableIterable rawOperatorStateInputs, CloseableIterable rawKeyedStateInputs) { - this.restored = restored; + this.restoredCheckpointId = restoredCheckpointId; this.operatorStateBackend = operatorStateBackend; this.keyedStateBackend = keyedStateBackend; this.internalTimeServiceManager = internalTimeServiceManager; @@ -661,8 +662,10 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } @Override - public boolean isRestored() { - return restored; + public OptionalLong getRestoredCheckpointId() { + return restoredCheckpointId == null + ? OptionalLong.empty() + : OptionalLong.of(restoredCheckpointId); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index f6fb558..3bf4671 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -222,7 +222,7 @@ public class SourceOperatorTest { // Crate the state context. OperatorStateStore operatorStateStore = createOperatorStateStore(); StateInitializationContext stateContext = - new StateInitializationContextImpl(false, operatorStateStore, null, null, null); + new StateInitializationContextImpl(null, operatorStateStore, null, null, null); // Update the context. stateContext diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index fbf767f..37bcf69 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -70,6 +70,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import static org.mockito.Mockito.mock; @@ -217,9 +218,10 @@ public class StateInitializationContextImplTest { 1.0, false); + OptionalLong restoredCheckpointId = stateContext.getRestoredCheckpointId(); this.initializationContext = new StateInitializationContextImpl( - stateContext.isRestored(), + restoredCheckpointId.isPresent() ? restoredCheckpointId.getAsLong() : null, stateContext.operatorStateBackend(), mock(KeyedStateStore.class), stateContext.rawKeyedStateInputs(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 3056b15..83a608c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -63,6 +63,7 @@ import javax.annotation.Nonnull; import java.io.Closeable; import java.util.Collection; import java.util.Collections; +import java.util.OptionalLong; import java.util.Random; import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle; @@ -203,7 +204,7 @@ public class StreamTaskStateInitializerImplTest { taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); JobManagerTaskRestore jobManagerTaskRestore = - new JobManagerTaskRestore(0L, taskStateSnapshot); + new JobManagerTaskRestore(42L, taskStateSnapshot); StreamTaskStateInitializer streamTaskStateManager = streamTaskStateManager(mockingBackend, jobManagerTaskRestore, false); @@ -236,6 +237,7 @@ public class StreamTaskStateInitializerImplTest { stateContext.rawOperatorStateInputs(); Assert.assertTrue("Expected the context to be restored", stateContext.isRestored()); + Assert.assertEquals(OptionalLong.of(42L), stateContext.getRestoredCheckpointId()); Assert.assertNotNull(operatorStateBackend); Assert.assertNotNull(keyedStateBackend); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java index 642328c..c257320 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators.collect.utils; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.runtime.state.FunctionInitializationContext; +import java.util.OptionalLong; + /** A {@link FunctionInitializationContext} for testing purpose. */ public class MockFunctionInitializationContext implements FunctionInitializationContext { @@ -35,6 +37,11 @@ public class MockFunctionInitializationContext implements FunctionInitialization } @Override + public OptionalLong getRestoredCheckpointId() { + throw new UnsupportedOperationException(); + } + + @Override public MockOperatorStateStore getOperatorStateStore() { return operatorStateStore; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index 394c5e2..39f5de4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -238,7 +238,7 @@ public class SourceOperatorEventTimeTest { new CloseableRegistry()); final StateInitializationContext stateContext = - new StateInitializationContextImpl(false, operatorStateStore, null, null, null); + new StateInitializationContextImpl(null, operatorStateStore, null, null, null); final SourceOperator sourceOperator = new TestingSourceOperator<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java index 0afc853..49cbff1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java @@ -45,11 +45,13 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertEquals; /** @@ -58,7 +60,7 @@ import static org.junit.Assert.assertEquals; */ public class RestoreStreamTaskTest extends TestLogger { - private static final Set RESTORED_OPERATORS = ConcurrentHashMap.newKeySet(); + private static final Map RESTORED_OPERATORS = new ConcurrentHashMap(); @Before public void setup() { @@ -91,7 +93,11 @@ public class RestoreStreamTaskTest extends TestLogger { Optional.of(restore)); assertEquals( - new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS); + new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), + RESTORED_OPERATORS.keySet()); + assertThat( + new HashSet<>(RESTORED_OPERATORS.values()), + contains(restore.getRestoreCheckpointId())); } @Test @@ -118,7 +124,10 @@ public class RestoreStreamTaskTest extends TestLogger { new CounterOperator(), Optional.of(restore)); - assertEquals(Collections.singleton(tailOperatorID), RESTORED_OPERATORS); + assertEquals(Collections.singleton(tailOperatorID), RESTORED_OPERATORS.keySet()); + assertThat( + new HashSet<>(RESTORED_OPERATORS.values()), + contains(restore.getRestoreCheckpointId())); } @Test @@ -143,7 +152,10 @@ public class RestoreStreamTaskTest extends TestLogger { new CounterOperator(), Optional.of(restore)); - assertEquals(Collections.singleton(headOperatorID), RESTORED_OPERATORS); + assertEquals(Collections.singleton(headOperatorID), RESTORED_OPERATORS.keySet()); + assertThat( + new HashSet<>(RESTORED_OPERATORS.values()), + contains(restore.getRestoreCheckpointId())); } @Test @@ -177,7 +189,11 @@ public class RestoreStreamTaskTest extends TestLogger { Optional.of(restore)); assertEquals( - new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS); + new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), + RESTORED_OPERATORS.keySet()); + assertThat( + new HashSet<>(RESTORED_OPERATORS.values()), + contains(restore.getRestoreCheckpointId())); } @Test @@ -204,7 +220,11 @@ public class RestoreStreamTaskTest extends TestLogger { Optional.of(restore)); assertEquals( - new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS); + new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), + RESTORED_OPERATORS.keySet()); + assertThat( + new HashSet<>(RESTORED_OPERATORS.values()), + contains(restore.getRestoreCheckpointId())); } private JobManagerTaskRestore createRunAndCheckpointOperatorChain( @@ -303,8 +323,13 @@ public class RestoreStreamTaskTest extends TestLogger { @Override public void initializeState(StateInitializationContext context) throws Exception { + assertEquals( + "Restored context id should be set iff is restored", + context.isRestored(), + context.getRestoredCheckpointId().isPresent()); if (context.isRestored()) { - RESTORED_OPERATORS.add(getOperatorID()); + RESTORED_OPERATORS.put( + getOperatorID(), context.getRestoredCheckpointId().getAsLong()); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index e38c459..07e40bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -160,6 +160,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -2278,6 +2279,11 @@ public class StreamTaskTest extends TestLogger { } @Override + public OptionalLong getRestoredCheckpointId() { + return controller.getRestoredCheckpointId(); + } + + @Override public OperatorStateBackend operatorStateBackend() { return controller.operatorStateBackend(); }