flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 02/11: [FLINK-23854][datastream] Pass checkpoint id to SinkWriter#snapshotState and restoredCheckpointId to Sink#InitContext.
Date Wed, 01 Sep 2021 06:29:24 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 341a20e8e107040ba116116566c6af072bddd397
Author: Arvid Heise <arvid@ververica.com>
AuthorDate: Thu Aug 26 20:02:54 2021 +0200

    [FLINK-23854][datastream] Pass checkpoint id to SinkWriter#snapshotState and restoredCheckpointId
to Sink#InitContext.
---
 .../connector/file/sink/writer/FileWriter.java     |  2 +-
 .../connector/file/sink/writer/FileWriterTest.java |  8 +++----
 .../flink/connector/kafka/sink/KafkaWriter.java    |  2 +-
 .../kafka/table/ReducingUpsertWriter.java          |  4 ++--
 .../connector/kafka/sink/KafkaWriterITCase.java    |  6 +++++
 .../org/apache/flink/api/connector/sink/Sink.java  | 15 +++++++++----
 .../flink/api/connector/sink/SinkWriter.java       | 14 +++++++++++-
 .../runtime/operators/sink/SinkOperator.java       | 26 +++++++++++++++++-----
 .../operators/sink/SinkWriterStateHandler.java     | 13 ++++++++---
 .../sink/StatefulSinkWriterStateHandler.java       |  8 ++++---
 .../sink/StatelessSinkWriterStateHandler.java      |  5 +++--
 .../operators/sink/SinkWriterOperatorTest.java     | 11 +++++++--
 .../streaming/runtime/operators/sink/TestSink.java |  2 +-
 13 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index 22b54b9..ab133ab 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -205,7 +205,7 @@ public class FileWriter<IN>
     }
 
     @Override
-    public List<FileWriterBucketState> snapshotState() throws IOException {
+    public List<FileWriterBucketState> snapshotState(long checkpointId) throws IOException
{
         checkState(bucketWriter != null, "sink has not been initialized");
 
         List<FileWriterBucketState> state = new ArrayList<>();
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index 2115f37..9f5a051 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -98,7 +98,7 @@ public class FileWriterTest {
         assertEquals(3, fileWriter.getActiveBuckets().size());
 
         fileWriter.prepareCommit(false);
-        List<FileWriterBucketState> states = fileWriter.snapshotState();
+        List<FileWriterBucketState> states = fileWriter.snapshotState(1L);
         assertEquals(3, states.size());
 
         fileWriter =
@@ -131,7 +131,7 @@ public class FileWriterTest {
         firstFileWriter.write("test3", new ContextImpl());
 
         firstFileWriter.prepareCommit(false);
-        List<FileWriterBucketState> firstState = firstFileWriter.snapshotState();
+        List<FileWriterBucketState> firstState = firstFileWriter.snapshotState(1L);
 
         FileWriter<String> secondFileWriter =
                 createWriter(
@@ -143,7 +143,7 @@ public class FileWriterTest {
         secondFileWriter.write("test2", new ContextImpl());
 
         secondFileWriter.prepareCommit(false);
-        List<FileWriterBucketState> secondState = secondFileWriter.snapshotState();
+        List<FileWriterBucketState> secondState = secondFileWriter.snapshotState(1L);
 
         List<FileWriterBucketState> mergedState = new ArrayList<>();
         mergedState.addAll(firstState);
@@ -183,7 +183,7 @@ public class FileWriterTest {
 
         fileWriter.write("test", new ContextImpl());
         fileWriter.prepareCommit(false);
-        fileWriter.snapshotState();
+        fileWriter.snapshotState(1L);
 
         // No more records and another call to prepareCommit will makes it inactive
         fileWriter.prepareCommit(false);
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 1aac4be..3ace69e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -178,7 +178,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
     }
 
     @Override
-    public List<KafkaWriterState> snapshotState() throws IOException {
+    public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException
{
         return ImmutableList.of(kafkaWriterState);
     }
 
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
index aa178af..fedde0e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
@@ -91,8 +91,8 @@ class ReducingUpsertWriter<WriterState> implements SinkWriter<RowData,
Void, Wri
     }
 
     @Override
-    public List<WriterState> snapshotState() throws IOException {
-        return wrappedWriter.snapshotState();
+    public List<WriterState> snapshotState(long checkpointId) throws IOException {
+        return wrappedWriter.snapshotState(checkpointId);
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index fa2f19e..105c187 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -59,6 +59,7 @@ import java.time.Duration;
 import java.util.Comparator;
 import java.util.Optional;
 import java.util.PriorityQueue;
+import java.util.OptionalLong;
 import java.util.Properties;
 import java.util.stream.IntStream;
 
@@ -258,6 +259,11 @@ public class KafkaWriterITCase {
         public SinkWriterMetricGroup metricGroup() {
             return metricGroup;
         }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return OptionalLong.of(0L);
+        }
     }
 
     private static class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer>
{
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
index 2d1a18c..1056d91 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 /**
  * This interface lets the sink developer build a simple sink topology, which could guarantee
the
@@ -58,15 +59,15 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT>
extends Serializ
     /**
      * Create a {@link SinkWriter}. If the application is resumed from a checkpoint or savepoint
and
      * the sink is stateful, it will receive the corresponding state obtained with {@link
-     * SinkWriter#snapshotState()} and serialized with {@link #getWriterStateSerializer()}.
If no
-     * state exists, the first existing, compatible state specified in {@link
+     * SinkWriter#snapshotState(long)} and serialized with {@link #getWriterStateSerializer()}.
If
+     * no state exists, the first existing, compatible state specified in {@link
      * #getCompatibleStateNames()} will be loaded and passed.
      *
      * @param context the runtime context.
      * @param states the writer's previous state.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
-     * @see SinkWriter#snapshotState()
+     * @see SinkWriter#snapshotState(long)
      * @see #getWriterStateSerializer()
      * @see #getCompatibleStateNames()
      */
@@ -75,7 +76,7 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends
Serializ
 
     /**
      * Any stateful sink needs to provide this state serializer and implement {@link
-     * SinkWriter#snapshotState()} properly. The respective state is used in {@link
+     * SinkWriter#snapshotState(long)} properly. The respective state is used in {@link
      * #createWriter(InitContext, List)} on recovery.
      *
      * @return the serializer of the writer's state type.
@@ -163,6 +164,12 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT>
extends Serializ
 
         /** @return The metric group this writer belongs to. */
         SinkWriterMetricGroup metricGroup();
+
+        /**
+         * Returns id of the restored checkpoint, if state was restored from the snapshot
of a
+         * previous execution.
+         */
+        OptionalLong getRestoredCheckpointId();
     }
 
     /**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java
index d1b1c62..1695e8a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.eventtime.Watermark;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -71,8 +72,19 @@ public interface SinkWriter<InputT, CommT, WriterStateT> extends
AutoCloseable {
     /**
      * @return The writer's state.
      * @throws IOException if fail to snapshot writer's state.
+     * @deprecated implement {@link #snapshotState(long)}
      */
-    List<WriterStateT> snapshotState() throws IOException;
+    default List<WriterStateT> snapshotState() throws IOException {
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return The writer's state.
+     * @throws IOException if fail to snapshot writer's state.
+     */
+    default List<WriterStateT> snapshotState(long checkpointId) throws IOException
{
+        return snapshotState();
+    }
 
     /** Context that {@link #write} can use for getting additional data about an input record.
*/
     interface Context {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
index 8db1858..4eac9ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.OptionalLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -130,16 +131,19 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
+        OptionalLong checkpointId = context.getRestoredCheckpointId();
         sinkWriter =
                 writerFactory.apply(
-                        createInitContext(), sinkWriterStateHandler.initializeState(context));
+                        createInitContext(
+                                checkpointId.isPresent() ? checkpointId.getAsLong() : null),
+                        sinkWriterStateHandler.initializeState(context));
         committerHandler.initializeState(context);
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        sinkWriterStateHandler.snapshotState(sinkWriter::snapshotState);
+        sinkWriterStateHandler.snapshotState(sinkWriter::snapshotState, context.getCheckpointId());
         committerHandler.snapshotState(context);
     }
 
@@ -196,12 +200,13 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
         super.close();
     }
 
-    private Sink.InitContext createInitContext() {
+    private Sink.InitContext createInitContext(@Nullable Long restoredCheckpointId) {
         return new InitContextImpl(
                 getRuntimeContext(),
                 processingTimeService,
                 mailboxExecutor,
-                InternalSinkWriterMetricGroup.wrap(getMetricGroup()));
+                InternalSinkWriterMetricGroup.wrap(getMetricGroup()),
+                restoredCheckpointId);
     }
 
     private class Context<IN> implements SinkWriter.Context {
@@ -230,17 +235,21 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
 
         private final SinkWriterMetricGroup metricGroup;
 
+        @Nullable private final Long restoredCheckpointId;
+
         private final StreamingRuntimeContext runtimeContext;
 
         public InitContextImpl(
                 StreamingRuntimeContext runtimeContext,
                 ProcessingTimeService processingTimeService,
                 MailboxExecutor mailboxExecutor,
-                SinkWriterMetricGroup metricGroup) {
+                SinkWriterMetricGroup metricGroup,
+                @Nullable Long restoredCheckpointId) {
             this.runtimeContext = checkNotNull(runtimeContext);
             this.mailboxExecutor = checkNotNull(mailboxExecutor);
             this.processingTimeService = checkNotNull(processingTimeService);
             this.metricGroup = checkNotNull(metricGroup);
+            this.restoredCheckpointId = restoredCheckpointId;
         }
 
         @Override
@@ -284,6 +293,13 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
         public SinkWriterMetricGroup metricGroup() {
             return metricGroup;
         }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return restoredCheckpointId == null
+                    ? OptionalLong.empty()
+                    : OptionalLong.of(restoredCheckpointId);
+        }
     }
 
     private static class ProcessingTimerServiceImpl implements Sink.ProcessingTimeService
{
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
index a76285e..2f2d2be 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.Serializable;
 import java.util.List;
@@ -37,7 +37,14 @@ interface SinkWriterStateHandler<WriterStateT> extends Serializable
{
      */
     List<WriterStateT> initializeState(StateInitializationContext context) throws Exception;
 
-    /** Stores the state of the supplier. The supplier should only be queried once. */
-    void snapshotState(SupplierWithException<List<WriterStateT>, Exception> stateSupplier)
+    /**
+     * Stores the state of the supplier. The supplier should only be queried once.
+     *
+     * @param stateExtractor
+     * @param checkpointId
+     */
+    void snapshotState(
+            FunctionWithException<Long, List<WriterStateT>, Exception> stateExtractor,
+            long checkpointId)
             throws Exception;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index e2d66bd..3130196 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
@@ -99,9 +99,11 @@ final class StatefulSinkWriterStateHandler<WriterStateT>
     }
 
     @Override
-    public void snapshotState(SupplierWithException<List<WriterStateT>, Exception>
stateSupplier)
+    public void snapshotState(
+            FunctionWithException<Long, List<WriterStateT>, Exception> stateExtractor,
+            long checkpointId)
             throws Exception {
-        writerState.update(stateSupplier.get());
+        writerState.update(stateExtractor.apply(checkpointId));
         previousSinkStates.forEach(ListState::clear);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
index e5925e5..976310e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import java.util.Collections;
 import java.util.List;
@@ -38,6 +38,7 @@ enum StatelessSinkWriterStateHandler implements SinkWriterStateHandler<Object>
{
     }
 
     @Override
-    public void snapshotState(SupplierWithException<List<Object>, Exception>
stateSupplier)
+    public void snapshotState(
+            FunctionWithException<Long, List<Object>, Exception> stateExtractor,
long checkpointId)
             throws Exception {}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
index cd560c8..a9d826f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
@@ -208,8 +208,9 @@ public class SinkWriterOperatorTest extends TestLogger {
     public void stateIsRestored() throws Exception {
         final long initialTime = 0;
 
+        SnapshottingBufferingSinkWriter snapshottingWriter = new SnapshottingBufferingSinkWriter();
         final OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness =
-                createTestHarness(new SnapshottingBufferingSinkWriter());
+                createTestHarness(snapshottingWriter);
 
         testHarness.open();
 
@@ -222,6 +223,9 @@ public class SinkWriterOperatorTest extends TestLogger {
 
         // we only see the watermark, so the committables must be stored in state
         assertThat(testHarness.getOutput(), contains(new Watermark(initialTime)));
+        assertThat(
+                snapshottingWriter.lastCheckpointId,
+                equalTo(stateful ? 1L : SnapshottingBufferingSinkWriter.NOT_SNAPSHOTTED));
 
         testHarness.close();
 
@@ -316,9 +320,12 @@ public class SinkWriterOperatorTest extends TestLogger {
 
     /** A {@link SinkWriter} buffers elements and snapshots them when asked. */
     static class SnapshottingBufferingSinkWriter extends BufferingSinkWriter {
+        public static final int NOT_SNAPSHOTTED = -1;
+        long lastCheckpointId = NOT_SNAPSHOTTED;
 
         @Override
-        public List<String> snapshotState() {
+        public List<String> snapshotState(long checkpointId) throws IOException {
+            lastCheckpointId = checkpointId;
             return elements;
         }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
index d635ba0..ca31a6a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
@@ -254,7 +254,7 @@ public class TestSink<T> implements Sink<T, String, String, String>
{
         }
 
         @Override
-        public List<String> snapshotState() {
+        public List<String> snapshotState(long checkpointId) throws IOException {
             return Collections.emptyList();
         }
 

Mime
View raw message