flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 01/11: [FLINK-23854][datastream] Expose the restored checkpoint id in ManagedInitializationContext.
Date Wed, 01 Sep 2021 06:29:23 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3076a23b1c7b14307160eba31af40bb4f4d72863
Author: Arvid Heise <arvid@ververica.com>
AuthorDate: Tue Aug 24 16:24:01 2021 +0200

    [FLINK-23854][datastream] Expose the restored checkpoint id in ManagedInitializationContext.
---
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java      |  2 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java          |  6 +++
 .../PrioritizedOperatorSubtaskState.java           | 44 ++++++++++++++++------
 .../checkpoint/StateAssignmentOperation.java       | 12 ++----
 .../flink/runtime/executiongraph/Execution.java    |  2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |  4 +-
 .../state/ManagedInitializationContext.java        | 13 +++++--
 .../state/StateInitializationContextImpl.java      | 19 ++++++++--
 .../flink/runtime/state/TaskStateManagerImpl.java  |  7 +++-
 .../runtime/state/TaskStateManagerImplTest.java    |  5 +--
 .../flink/runtime/state/TestTaskStateManager.java  |  3 +-
 .../api/operators/StreamOperatorStateContext.java  | 14 ++++++-
 .../api/operators/StreamOperatorStateHandler.java  |  5 ++-
 .../operators/StreamTaskStateInitializerImpl.java  | 15 +++++---
 .../api/operators/SourceOperatorTest.java          |  2 +-
 .../StateInitializationContextImplTest.java        |  4 +-
 .../StreamTaskStateInitializerImplTest.java        |  4 +-
 .../utils/MockFunctionInitializationContext.java   |  7 ++++
 .../source/SourceOperatorEventTimeTest.java        |  2 +-
 .../runtime/tasks/RestoreStreamTaskTest.java       | 41 ++++++++++++++++----
 .../streaming/runtime/tasks/StreamTaskTest.java    |  6 +++
 21 files changed, 158 insertions(+), 59 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index 17142b2..5b5aa02 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -369,7 +369,7 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase {
 
     static StateInitializationContextImpl buildInitCtx(boolean restored) {
         return new StateInitializationContextImpl(
-                restored,
+                restored ? 1L : null,
                 new DefaultOperatorStateBackend(
                         new ExecutionConfig(),
                         new CloseableRegistry(),
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index d87f73a..add4d2f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -84,6 +84,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -1520,6 +1521,11 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
         }
 
         @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return isRestored ? OptionalLong.of(1L) : OptionalLong.empty();
+        }
+
+        @Override
         public OperatorStateStore getOperatorStateStore() {
             return operatorStateStore;
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
index fc4f007..ef9bcd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.commons.lang3.BooleanUtils;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -47,10 +48,13 @@ import java.util.function.Function;
 @Internal
 public class PrioritizedOperatorSubtaskState {
 
+    private static final OperatorSubtaskState EMPTY_JM_STATE_STATE =
+            OperatorSubtaskState.builder().build();
+
     /** Singleton instance for an empty, non-restored operator state. */
     private static final PrioritizedOperatorSubtaskState EMPTY_NON_RESTORED_INSTANCE =
             new PrioritizedOperatorSubtaskState.Builder(
-                            OperatorSubtaskState.builder().build(), Collections.emptyList(),
false)
+                            EMPTY_JM_STATE_STATE, Collections.emptyList(), null)
                     .build();
 
     /** List of prioritized snapshot alternatives for managed operator state. */
@@ -70,8 +74,8 @@ public class PrioritizedOperatorSubtaskState {
     private final List<StateObjectCollection<ResultSubpartitionStateHandle>>
             prioritizedResultSubpartitionState;
 
-    /** Signal flag if this represents state for a restored operator. */
-    private final boolean restored;
+    /** Checkpoint id for a restored operator or null if not restored. */
+    private final @Nullable Long restoredCheckpointId;
 
     PrioritizedOperatorSubtaskState(
             @Nonnull List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState,
@@ -86,7 +90,7 @@ public class PrioritizedOperatorSubtaskState {
             @Nonnull
                     List<StateObjectCollection<ResultSubpartitionStateHandle>>
                             prioritizedResultSubpartitionState,
-            boolean restored) {
+            @Nullable Long restoredCheckpointId) {
 
         this.prioritizedManagedOperatorState = prioritizedManagedOperatorState;
         this.prioritizedRawOperatorState = prioritizedRawOperatorState;
@@ -94,7 +98,7 @@ public class PrioritizedOperatorSubtaskState {
         this.prioritizedRawKeyedState = prioritizedRawKeyedState;
         this.prioritizedInputChannelState = prioritizedInputChannelState;
         this.prioritizedResultSubpartitionState = prioritizedResultSubpartitionState;
-        this.restored = restored;
+        this.restoredCheckpointId = restoredCheckpointId;
     }
 
     // -----------------------------------------------------------------------------------------------------------------
@@ -192,7 +196,17 @@ public class PrioritizedOperatorSubtaskState {
      * snapshots.
      */
     public boolean isRestored() {
-        return restored;
+        return restoredCheckpointId != null;
+    }
+
+    /**
+     * Returns the checkpoint id if this was created for a restored operator, null otherwise.
+     * Restored operators are operators that participated in a previous checkpoint, even
if they did
+     * not emit any state snapshots.
+     */
+    @Nullable
+    public Long getRestoredCheckpointId() {
+        return restoredCheckpointId;
     }
 
     private static <T extends StateObject> StateObjectCollection<T> lastElement(
@@ -208,6 +222,12 @@ public class PrioritizedOperatorSubtaskState {
         return EMPTY_NON_RESTORED_INSTANCE;
     }
 
+    public static PrioritizedOperatorSubtaskState empty(long restoredCheckpointId) {
+        return new PrioritizedOperatorSubtaskState.Builder(
+                        EMPTY_JM_STATE_STATE, Collections.emptyList(), restoredCheckpointId)
+                .build();
+    }
+
     /** A builder for PrioritizedOperatorSubtaskState. */
     @Internal
     public static class Builder {
@@ -218,23 +238,23 @@ public class PrioritizedOperatorSubtaskState {
         /** (Local) alternatives to the job manager state. */
         @Nonnull private final List<OperatorSubtaskState> alternativesByPriority;
 
-        /** Flag if the states have been restored. */
-        private final boolean restored;
+        /** Checkpoint id of the restored checkpoint or null if not restored. */
+        private final @Nullable Long restoredCheckpointId;
 
         public Builder(
                 @Nonnull OperatorSubtaskState jobManagerState,
                 @Nonnull List<OperatorSubtaskState> alternativesByPriority) {
-            this(jobManagerState, alternativesByPriority, true);
+            this(jobManagerState, alternativesByPriority, null);
         }
 
         public Builder(
                 @Nonnull OperatorSubtaskState jobManagerState,
                 @Nonnull List<OperatorSubtaskState> alternativesByPriority,
-                boolean restored) {
+                @Nullable Long restoredCheckpointId) {
 
             this.jobManagerState = jobManagerState;
             this.alternativesByPriority = alternativesByPriority;
-            this.restored = restored;
+            this.restoredCheckpointId = restoredCheckpointId;
         }
 
         public PrioritizedOperatorSubtaskState build() {
@@ -290,7 +310,7 @@ public class PrioritizedOperatorSubtaskState {
                             jobManagerState.getResultSubpartitionState(),
                             resultSubpartitionStateAlternatives,
                             eqStateApprover(ResultSubpartitionStateHandle::getInfo)),
-                    restored);
+                    restoredCheckpointId);
         }
 
         /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index c409340..69fc65c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -239,7 +239,6 @@ public class StateAssignmentOperation {
             int subTaskIndex,
             Execution currentExecutionAttempt) {
         TaskStateSnapshot taskState = new TaskStateSnapshot(operatorIDs.size(), false);
-        boolean statelessTask = true;
 
         for (OperatorIDPair operatorID : operatorIDs) {
             OperatorInstanceID instanceID =
@@ -247,18 +246,13 @@ public class StateAssignmentOperation {
 
             OperatorSubtaskState operatorSubtaskState = assignment.getSubtaskState(instanceID);
 
-            if (operatorSubtaskState.hasState()) {
-                statelessTask = false;
-            }
             taskState.putSubtaskStateByOperatorID(
                     operatorID.getGeneratedOperatorID(), operatorSubtaskState);
         }
 
-        if (!statelessTask) {
-            JobManagerTaskRestore taskRestore =
-                    new JobManagerTaskRestore(restoreCheckpointId, taskState);
-            currentExecutionAttempt.setInitialState(taskRestore);
-        }
+        JobManagerTaskRestore taskRestore =
+                new JobManagerTaskRestore(restoreCheckpointId, taskState);
+        currentExecutionAttempt.setInitialState(taskRestore);
     }
 
     public void checkParallelismPreconditions(TaskStateAssignment taskStateAssignment) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 79896b2..d29bfc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -356,7 +356,7 @@ public class Execution
      *
      * @param taskRestore information to restore the state
      */
-    public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
+    public void setInitialState(JobManagerTaskRestore taskRestore) {
         this.taskRestore = taskRestore;
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6502ad9..265101b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -342,7 +342,9 @@ public class ExecutionVertex
      * that the execution attempt will resume.
      */
     public Optional<TaskManagerLocation> getPreferredLocationBasedOnState() {
-        if (currentExecution.getTaskRestore() != null) {
+        // only restore to same execution if it has state
+        if (currentExecution.getTaskRestore() != null
+                && currentExecution.getTaskRestore().getTaskStateSnapshot().hasState())
{
             return Optional.ofNullable(getLatestPriorLocation());
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
index e7465f5..75433a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 
+import java.util.OptionalLong;
+
 /**
  * This interface provides a context in which operators can initialize by registering to
managed
  * state (i.e. state that is managed by state backends).
@@ -33,11 +35,16 @@ import org.apache.flink.api.common.state.OperatorStateStore;
  */
 public interface ManagedInitializationContext {
 
+    /** Returns true, if state was restored from the snapshot of a previous execution. */
+    default boolean isRestored() {
+        return getRestoredCheckpointId().isPresent();
+    }
+
     /**
-     * Returns true, if state was restored from the snapshot of a previous execution. This
returns
-     * always false for stateless tasks.
+     * Returns id of the restored checkpoint, if state was restored from the snapshot of
a previous
+     * execution.
      */
-    boolean isRestored();
+    OptionalLong getRestoredCheckpointId();
 
     /** Returns an interface that allows for registering operator state with the backend.
*/
     OperatorStateStore getOperatorStateStore();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index f5710ae..7ff901a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -21,11 +21,15 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 
+import javax.annotation.Nullable;
+
+import java.util.OptionalLong;
+
 /** Default implementation of {@link StateInitializationContext}. */
 public class StateInitializationContextImpl implements StateInitializationContext {
 
     /** Signal whether any state to restore was found */
-    private final boolean restored;
+    private final @Nullable Long restoredCheckpointId;
 
     private final OperatorStateStore operatorStateStore;
 
@@ -35,13 +39,13 @@ public class StateInitializationContextImpl implements StateInitializationContex
     private final Iterable<StatePartitionStreamProvider> rawOperatorStateInputs;
 
     public StateInitializationContextImpl(
-            boolean restored,
+            @Nullable Long restoredCheckpointId,
             OperatorStateStore operatorStateStore,
             KeyedStateStore keyedStateStore,
             Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs,
             Iterable<StatePartitionStreamProvider> rawOperatorStateInputs) {
 
-        this.restored = restored;
+        this.restoredCheckpointId = restoredCheckpointId;
         this.operatorStateStore = operatorStateStore;
         this.keyedStateStore = keyedStateStore;
         this.rawOperatorStateInputs = rawOperatorStateInputs;
@@ -50,7 +54,14 @@ public class StateInitializationContextImpl implements StateInitializationContex
 
     @Override
     public boolean isRestored() {
-        return restored;
+        return restoredCheckpointId != null;
+    }
+
+    @Override
+    public OptionalLong getRestoredCheckpointId() {
+        return restoredCheckpointId == null
+                ? OptionalLong.empty()
+                : OptionalLong.of(restoredCheckpointId);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index cf9fae6..02f21e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -176,7 +176,8 @@ public class TaskStateManagerImpl implements TaskStateManager {
                 jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
 
         if (jobManagerSubtaskState == null) {
-            return PrioritizedOperatorSubtaskState.emptyNotRestored();
+            return PrioritizedOperatorSubtaskState.empty(
+                    jobManagerTaskRestore.getRestoreCheckpointId());
         }
 
         long restoreCheckpointId = jobManagerTaskRestore.getRestoreCheckpointId();
@@ -208,7 +209,9 @@ public class TaskStateManagerImpl implements TaskStateManager {
 
         PrioritizedOperatorSubtaskState.Builder builder =
                 new PrioritizedOperatorSubtaskState.Builder(
-                        jobManagerSubtaskState, alternativesByPriority, true);
+                        jobManagerSubtaskState,
+                        alternativesByPriority,
+                        jobManagerTaskRestore.getRestoreCheckpointId());
 
         return builder.build();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index 2d2a349..223f673 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -158,9 +158,8 @@ public class TaskStateManagerImplTest extends TestLogger {
 
         Assert.assertTrue(prioritized_1.isRestored());
         Assert.assertTrue(prioritized_2.isRestored());
-        Assert.assertFalse(prioritized_3.isRestored());
-        Assert.assertFalse(
-                taskStateManager.prioritizedOperatorState(new OperatorID()).isRestored());
+        Assert.assertTrue(prioritized_3.isRestored());
+        Assert.assertTrue(taskStateManager.prioritizedOperatorState(new OperatorID()).isRestored());
 
         // checks for operator 1.
         Iterator<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState_1
=
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 75fb52f..a52b15a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -184,7 +184,8 @@ public class TestTaskStateManager implements TaskStateManager {
                     }
                 }
                 PrioritizedOperatorSubtaskState.Builder builder =
-                        new PrioritizedOperatorSubtaskState.Builder(jmOpState, tmStateCollection);
+                        new PrioritizedOperatorSubtaskState.Builder(
+                                jmOpState, tmStateCollection, reportedCheckpointId);
                 return builder.build();
             }
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
index 52a4d98..695129c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.util.CloseableIterable;
 
+import java.util.OptionalLong;
+
 /**
  * This interface represents a context from which a stream operator can initialize everything
  * connected to state such as e.g. backends, raw state, and timer service manager.
@@ -31,9 +33,17 @@ import org.apache.flink.util.CloseableIterable;
 public interface StreamOperatorStateContext {
 
     /**
-     * Returns true, the states provided by this context are restored from a checkpoint/savepoint.
+     * Returns true if the states provided by this context are restored from a checkpoint/savepoint.
+     */
+    default boolean isRestored() {
+        return getRestoredCheckpointId().isPresent();
+    }
+
+    /**
+     * Returns non-empty if the states provided by this context are restored from a
+     * checkpoint/savepoint.
      */
-    boolean isRestored();
+    OptionalLong getRestoredCheckpointId();
 
     /** Returns the operator state backend for the stream operator. */
     OperatorStateBackend operatorStateBackend();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index 70b73c7..7311a72 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -62,6 +62,7 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -106,10 +107,10 @@ public class StreamOperatorStateHandler {
                 context.rawOperatorStateInputs();
 
         try {
+            OptionalLong checkpointId = context.getRestoredCheckpointId();
             StateInitializationContext initializationContext =
                     new StateInitializationContextImpl(
-                            context.isRestored(), // information whether we restore or start
for
-                            // the first time
+                            checkpointId.isPresent() ? checkpointId.getAsLong() : null,
                             operatorStateBackend, // access to operator state backend
                             keyedStateStore, // access to keyed state backend
                             keyedStateInputs, // access to keyed state stream
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 31e567f..3df0b50 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -63,6 +63,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.OptionalLong;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
@@ -219,7 +220,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
             // -------------- Preparing return value --------------
 
             return new StreamOperatorStateContextImpl(
-                    prioritizedOperatorSubtaskStates.isRestored(),
+                    prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),
                     operatorStateBackend,
                     keyedStatedBackend,
                     timeServiceManager,
@@ -635,7 +636,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
 
     private static class StreamOperatorStateContextImpl implements StreamOperatorStateContext
{
 
-        private final boolean restored;
+        private final @Nullable Long restoredCheckpointId;
 
         private final OperatorStateBackend operatorStateBackend;
         private final CheckpointableKeyedStateBackend<?> keyedStateBackend;
@@ -645,14 +646,14 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
         private final CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;
 
         StreamOperatorStateContextImpl(
-                boolean restored,
+                @Nullable Long restoredCheckpointId,
                 OperatorStateBackend operatorStateBackend,
                 CheckpointableKeyedStateBackend<?> keyedStateBackend,
                 InternalTimeServiceManager<?> internalTimeServiceManager,
                 CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs,
                 CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs)
{
 
-            this.restored = restored;
+            this.restoredCheckpointId = restoredCheckpointId;
             this.operatorStateBackend = operatorStateBackend;
             this.keyedStateBackend = keyedStateBackend;
             this.internalTimeServiceManager = internalTimeServiceManager;
@@ -661,8 +662,10 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
         }
 
         @Override
-        public boolean isRestored() {
-            return restored;
+        public OptionalLong getRestoredCheckpointId() {
+            return restoredCheckpointId == null
+                    ? OptionalLong.empty()
+                    : OptionalLong.of(restoredCheckpointId);
         }
 
         @Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index f6fb558..3bf4671 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -222,7 +222,7 @@ public class SourceOperatorTest {
         // Crate the state context.
         OperatorStateStore operatorStateStore = createOperatorStateStore();
         StateInitializationContext stateContext =
-                new StateInitializationContextImpl(false, operatorStateStore, null, null,
null);
+                new StateInitializationContextImpl(null, operatorStateStore, null, null,
null);
 
         // Update the context.
         stateContext
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index fbf767f..37bcf69 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -70,6 +70,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 
 import static org.mockito.Mockito.mock;
@@ -217,9 +218,10 @@ public class StateInitializationContextImplTest {
                         1.0,
                         false);
 
+        OptionalLong restoredCheckpointId = stateContext.getRestoredCheckpointId();
         this.initializationContext =
                 new StateInitializationContextImpl(
-                        stateContext.isRestored(),
+                        restoredCheckpointId.isPresent() ? restoredCheckpointId.getAsLong()
: null,
                         stateContext.operatorStateBackend(),
                         mock(KeyedStateStore.class),
                         stateContext.rawKeyedStateInputs(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 3056b15..83a608c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -63,6 +63,7 @@ import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.OptionalLong;
 import java.util.Random;
 
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
@@ -203,7 +204,7 @@ public class StreamTaskStateInitializerImplTest {
         taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
 
         JobManagerTaskRestore jobManagerTaskRestore =
-                new JobManagerTaskRestore(0L, taskStateSnapshot);
+                new JobManagerTaskRestore(42L, taskStateSnapshot);
 
         StreamTaskStateInitializer streamTaskStateManager =
                 streamTaskStateManager(mockingBackend, jobManagerTaskRestore, false);
@@ -236,6 +237,7 @@ public class StreamTaskStateInitializerImplTest {
                 stateContext.rawOperatorStateInputs();
 
         Assert.assertTrue("Expected the context to be restored", stateContext.isRestored());
+        Assert.assertEquals(OptionalLong.of(42L), stateContext.getRestoredCheckpointId());
 
         Assert.assertNotNull(operatorStateBackend);
         Assert.assertNotNull(keyedStateBackend);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java
index 642328c..c257320 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionInitializationContext.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators.collect.utils;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 
+import java.util.OptionalLong;
+
 /** A {@link FunctionInitializationContext} for testing purpose. */
 public class MockFunctionInitializationContext implements FunctionInitializationContext {
 
@@ -35,6 +37,11 @@ public class MockFunctionInitializationContext implements FunctionInitialization
     }
 
     @Override
+    public OptionalLong getRestoredCheckpointId() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public MockOperatorStateStore getOperatorStateStore() {
         return operatorStateStore;
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 394c5e2..39f5de4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -238,7 +238,7 @@ public class SourceOperatorEventTimeTest {
                                 new CloseableRegistry());
 
         final StateInitializationContext stateContext =
-                new StateInitializationContextImpl(false, operatorStateStore, null, null,
null);
+                new StateInitializationContextImpl(null, operatorStateStore, null, null,
null);
 
         final SourceOperator<T, MockSourceSplit> sourceOperator =
                 new TestingSourceOperator<>(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
index 0afc853..49cbff1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -45,11 +45,13 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -58,7 +60,7 @@ import static org.junit.Assert.assertEquals;
  */
 public class RestoreStreamTaskTest extends TestLogger {
 
-    private static final Set<OperatorID> RESTORED_OPERATORS = ConcurrentHashMap.newKeySet();
+    private static final Map<OperatorID, Long> RESTORED_OPERATORS = new ConcurrentHashMap();
 
     @Before
     public void setup() {
@@ -91,7 +93,11 @@ public class RestoreStreamTaskTest extends TestLogger {
                 Optional.of(restore));
 
         assertEquals(
-                new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS);
+                new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)),
+                RESTORED_OPERATORS.keySet());
+        assertThat(
+                new HashSet<>(RESTORED_OPERATORS.values()),
+                contains(restore.getRestoreCheckpointId()));
     }
 
     @Test
@@ -118,7 +124,10 @@ public class RestoreStreamTaskTest extends TestLogger {
                 new CounterOperator(),
                 Optional.of(restore));
 
-        assertEquals(Collections.singleton(tailOperatorID), RESTORED_OPERATORS);
+        assertEquals(Collections.singleton(tailOperatorID), RESTORED_OPERATORS.keySet());
+        assertThat(
+                new HashSet<>(RESTORED_OPERATORS.values()),
+                contains(restore.getRestoreCheckpointId()));
     }
 
     @Test
@@ -143,7 +152,10 @@ public class RestoreStreamTaskTest extends TestLogger {
                 new CounterOperator(),
                 Optional.of(restore));
 
-        assertEquals(Collections.singleton(headOperatorID), RESTORED_OPERATORS);
+        assertEquals(Collections.singleton(headOperatorID), RESTORED_OPERATORS.keySet());
+        assertThat(
+                new HashSet<>(RESTORED_OPERATORS.values()),
+                contains(restore.getRestoreCheckpointId()));
     }
 
     @Test
@@ -177,7 +189,11 @@ public class RestoreStreamTaskTest extends TestLogger {
                 Optional.of(restore));
 
         assertEquals(
-                new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS);
+                new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)),
+                RESTORED_OPERATORS.keySet());
+        assertThat(
+                new HashSet<>(RESTORED_OPERATORS.values()),
+                contains(restore.getRestoreCheckpointId()));
     }
 
     @Test
@@ -204,7 +220,11 @@ public class RestoreStreamTaskTest extends TestLogger {
                 Optional.of(restore));
 
         assertEquals(
-                new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS);
+                new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)),
+                RESTORED_OPERATORS.keySet());
+        assertThat(
+                new HashSet<>(RESTORED_OPERATORS.values()),
+                contains(restore.getRestoreCheckpointId()));
     }
 
     private JobManagerTaskRestore createRunAndCheckpointOperatorChain(
@@ -303,8 +323,13 @@ public class RestoreStreamTaskTest extends TestLogger {
 
         @Override
         public void initializeState(StateInitializationContext context) throws Exception
{
+            assertEquals(
+                    "Restored context id should be set iff is restored",
+                    context.isRestored(),
+                    context.getRestoredCheckpointId().isPresent());
             if (context.isRestored()) {
-                RESTORED_OPERATORS.add(getOperatorID());
+                RESTORED_OPERATORS.put(
+                        getOperatorID(), context.getRestoredCheckpointId().getAsLong());
             }
         }
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e38c459..07e40bd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -160,6 +160,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -2278,6 +2279,11 @@ public class StreamTaskTest extends TestLogger {
                     }
 
                     @Override
+                    public OptionalLong getRestoredCheckpointId() {
+                        return controller.getRestoredCheckpointId();
+                    }
+
+                    @Override
                     public OperatorStateBackend operatorStateBackend() {
                         return controller.operatorStateBackend();
                     }

Mime
View raw message