flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 03/11: [FLINK-23896][streaming] Implement retrying for failed committables for Sinks.
Date Wed, 01 Sep 2021 06:29:25 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9061fdb09abceefce9c82d41a46a1ea7da16eecd
Author: Fabian Paul <fabianpaul@ververica.com>
AuthorDate: Mon Aug 23 15:40:31 2021 +0200

    [FLINK-23896][streaming] Implement retrying for failed committables for Sinks.
    
    In batch mode and add stop-with-savepoint with drain, the CommitRetryer will wait indefinitively in a close loop to avoid data loss.
    Otherwise, it will retry every second until all commits succeed.
---
 .../operators/sink/AbstractCommitterHandler.java   |  42 ++++++-
 .../sink/AbstractStreamingCommitterHandler.java    |  49 +++++----
 .../operators/sink/BatchCommitterHandler.java      |  22 +++-
 .../runtime/operators/sink/CommitRetrier.java      |  84 ++++++++++++++
 .../runtime/operators/sink/CommitterHandler.java   |  11 ++
 .../runtime/operators/sink/CommitterOperator.java  |  43 +++-----
 .../operators/sink/CommitterOperatorFactory.java   |   3 +-
 .../operators/sink/ForwardCommittingHandler.java   |  12 +-
 .../sink/GlobalBatchCommitterHandler.java          |  21 ++--
 .../sink/GlobalStreamingCommitterHandler.java      |  27 ++---
 .../operators/sink/NoopCommitterHandler.java       |   9 ++
 .../runtime/operators/sink/SinkOperator.java       |  20 +++-
 .../operators/sink/StreamingCommitterHandler.java  |  16 +--
 .../runtime/tasks/TestProcessingTimeService.java   |   4 +
 .../operators/sink/BatchCommitterHandlerTest.java  |  10 +-
 .../runtime/operators/sink/CommitRetrierTest.java  | 122 +++++++++++++++++++++
 .../sink/GlobalBatchCommitterHandlerTest.java      |  11 +-
 .../sink/GlobalStreamingCommitterHandlerTest.java  |  35 +++++-
 .../sink/StreamingCommitterHandlerTest.java        |  16 ++-
 .../streaming/runtime/operators/sink/TestSink.java |  36 +++++-
 20 files changed, 465 insertions(+), 128 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
index c7548a9..9cb570c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
@@ -19,18 +19,58 @@ package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.util.function.SupplierWithException;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 
-abstract class AbstractCommitterHandler<InputT, OutputT>
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractCommitterHandler<InputT, OutputT, RecoverT>
         implements CommitterHandler<InputT, OutputT> {
 
     /** Record all the committables until commit. */
     private final Deque<InputT> committables = new ArrayDeque<>();
 
+    /** The committables that need to be committed again after recovering from a failover. */
+    private final List<RecoverT> recoveredCommittables = new ArrayList<>();
+
+    /**
+     * Notifies a list of committables that might need to be committed again after recovering from a
+     * failover.
+     *
+     * @param recovered A list of committables
+     */
+    protected void recoveredCommittables(List<RecoverT> recovered) throws IOException {
+        recoveredCommittables.addAll(checkNotNull(recovered));
+    }
+
+    protected List<RecoverT> prependRecoveredCommittables(List<RecoverT> committables) {
+        if (recoveredCommittables.isEmpty()) {
+            return committables;
+        }
+        List<RecoverT> all = new ArrayList<>(recoveredCommittables.size() + committables.size());
+        all.addAll(recoveredCommittables);
+        all.addAll(committables);
+        recoveredCommittables.clear();
+        return all;
+    }
+
+    @Override
+    public boolean needsRetry() {
+        return !recoveredCommittables.isEmpty();
+    }
+
+    @Override
+    public void retry() throws IOException, InterruptedException {
+        retry(prependRecoveredCommittables(Collections.emptyList()));
+    }
+
+    protected abstract void retry(List<RecoverT> recoveredCommittables)
+            throws IOException, InterruptedException;
+
     @Override
     public List<OutputT> processCommittables(
             SupplierWithException<List<InputT>, Exception> committableSupplier) throws Exception {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java
index f67f5306..2efd949 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterHandler.java
@@ -50,7 +50,7 @@ import java.util.TreeMap;
  * @param <CommT> The committable type of the {@link Committer}.
  */
 abstract class AbstractStreamingCommitterHandler<InputT, CommT>
-        extends AbstractCommitterHandler<InputT, CommT> {
+        extends AbstractCommitterHandler<InputT, CommT, CommT> {
     private static final Logger LOG =
             LoggerFactory.getLogger(AbstractStreamingCommitterHandler.class);
 
@@ -69,14 +69,6 @@ abstract class AbstractStreamingCommitterHandler<InputT, CommT>
     private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
 
     /**
-     * Notifies a list of committables that might need to be committed again after recovering from a
-     * failover.
-     *
-     * @param committables A list of committables
-     */
-    abstract void recoveredCommittables(List<CommT> committables) throws IOException;
-
-    /**
      * Prepares a commit.
      *
      * @param input A list of input elements received since last pre-commit
@@ -99,6 +91,12 @@ abstract class AbstractStreamingCommitterHandler<InputT, CommT>
     }
 
     @Override
+    protected void retry(List<CommT> recoveredCommittables)
+            throws IOException, InterruptedException {
+        recoveredCommittables(commit(recoveredCommittables));
+    }
+
+    @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
         streamingCommitterState =
@@ -122,24 +120,27 @@ abstract class AbstractStreamingCommitterHandler<InputT, CommT>
     }
 
     protected List<CommT> commitUpTo(long checkpointId) throws IOException, InterruptedException {
-        final Iterator<Map.Entry<Long, List<CommT>>> it =
-                committablesPerCheckpoint.headMap(checkpointId, true).entrySet().iterator();
-
-        final List<CommT> readyCommittables = new ArrayList<>();
-
-        while (it.hasNext()) {
-            final Map.Entry<Long, List<CommT>> entry = it.next();
-            final List<CommT> committables = entry.getValue();
-
-            readyCommittables.addAll(committables);
-            it.remove();
+        NavigableMap<Long, List<CommT>> headMap =
+                committablesPerCheckpoint.headMap(checkpointId, true);
+        final List<CommT> readyCommittables;
+        if (headMap.size() == 1) {
+            readyCommittables = headMap.pollFirstEntry().getValue();
+        } else {
+
+            readyCommittables = new ArrayList<>();
+
+            final Iterator<Map.Entry<Long, List<CommT>>> it = headMap.entrySet().iterator();
+            while (it.hasNext()) {
+                final Map.Entry<Long, List<CommT>> entry = it.next();
+                final List<CommT> committables = entry.getValue();
+
+                readyCommittables.addAll(committables);
+                it.remove();
+            }
         }
 
         LOG.info("Committing the state for checkpoint {}", checkpointId);
-        final List<CommT> neededToRetryCommittables = commit(readyCommittables);
-        if (!neededToRetryCommittables.isEmpty()) {
-            throw new UnsupportedOperationException("Currently does not support the re-commit!");
-        }
+        recoveredCommittables(commit(readyCommittables));
         return readyCommittables;
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java
index 01cec40..c6cd6bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java
@@ -33,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <InputT> The committable type of the {@link Committer}.
  */
 final class BatchCommitterHandler<InputT, OutputT>
-        extends AbstractCommitterHandler<InputT, OutputT> {
+        extends AbstractCommitterHandler<InputT, OutputT, InputT> {
 
     /** Responsible for committing the committable to the external system. */
     private final Committer<InputT> committer;
@@ -59,14 +59,24 @@ final class BatchCommitterHandler<InputT, OutputT>
     }
 
     @Override
+    public boolean needsRetry() {
+        return super.needsRetry() || chainedHandler.needsRetry();
+    }
+
+    @Override
+    protected void retry(List<InputT> recoveredCommittables)
+            throws IOException, InterruptedException {
+        if (!recoveredCommittables.isEmpty()) {
+            recoveredCommittables(committer.commit(recoveredCommittables));
+        }
+        chainedHandler.retry();
+    }
+
+    @Override
     public List<OutputT> endOfInput() throws IOException, InterruptedException {
         List<InputT> allCommittables = pollCommittables();
         if (!allCommittables.isEmpty()) {
-            final List<InputT> neededRetryCommittables = committer.commit(allCommittables);
-            if (!neededRetryCommittables.isEmpty()) {
-                throw new UnsupportedOperationException(
-                        "Currently does not support the re-commit!");
-            }
+            recoveredCommittables(committer.commit(allCommittables));
         }
         return chainedHandler.endOfInput();
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
new file mode 100644
index 0000000..85205a0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all committables are eventually
+ * committed.
+ */
+class CommitRetrier {
+    private final ProcessingTimeService processingTimeService;
+    private final CommitterHandler<?, ?> committerHandler;
+    private final Clock clock;
+    @VisibleForTesting static final int RETRY_DELAY = 1000;
+
+    public CommitRetrier(
+            ProcessingTimeService processingTimeService, CommitterHandler<?, ?> committerHandler) {
+        this(processingTimeService, committerHandler, SystemClock.getInstance());
+    }
+
+    @VisibleForTesting
+    public CommitRetrier(
+            ProcessingTimeService processingTimeService,
+            CommitterHandler<?, ?> committerHandler,
+            Clock clock) {
+        this.processingTimeService = checkNotNull(processingTimeService);
+        this.committerHandler = checkNotNull(committerHandler);
+        this.clock = clock;
+    }
+
+    public void retryWithDelay() {
+        retryAt(clock.absoluteTimeMillis() + RETRY_DELAY);
+    }
+
+    private void retryAt(long timestamp) {
+        if (committerHandler.needsRetry()) {
+            processingTimeService.registerTimer(
+                    timestamp,
+                    ts -> {
+                        if (retry(1)) {
+                            retryAt(ts + RETRY_DELAY);
+                        }
+                    });
+        }
+    }
+
+    public void retryIndefinitely() throws IOException, InterruptedException {
+        retry(Long.MAX_VALUE);
+    }
+
+    @VisibleForTesting
+    boolean retry(long tries) throws IOException, InterruptedException {
+        for (long i = 0; i < tries; i++) {
+            if (!committerHandler.needsRetry()) {
+                return false;
+            }
+            committerHandler.retry();
+        }
+        return committerHandler.needsRetry();
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
index e642944..26ce1da 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
@@ -66,4 +66,15 @@ interface CommitterHandler<InputT, OutputT> extends AutoCloseable, Serializable
             throws IOException, InterruptedException {
         return Collections.emptyList();
     }
+
+    boolean needsRetry();
+
+    /**
+     * Retries all recovered committables. These committables may either be restored in {@link
+     * #initializeState(StateInitializationContext)} and have been re-added in any of the committing
+     * functions.
+     *
+     * @return true if more committables can be retried.
+     */
+    void retry() throws IOException, InterruptedException;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index d8e1653..125ec06 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -25,13 +25,12 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An operator that processes committables of a {@link org.apache.flink.api.connector.sink.Sink}.
  *
@@ -57,23 +56,16 @@ class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]>
 
     private final SimpleVersionedSerializer<InputT> inputSerializer;
     private final CommitterHandler<InputT, OutputT> committerHandler;
-    @Nullable private final SimpleVersionedSerializer<OutputT> outputSerializer;
-
-    public CommitterOperator(
-            SimpleVersionedSerializer<InputT> inputSerializer,
-            CommitterHandler<InputT, OutputT> committerHandler,
-            SimpleVersionedSerializer<OutputT> outputSerializer) {
-        this.inputSerializer = inputSerializer;
-        this.committerHandler = committerHandler;
-        this.outputSerializer = outputSerializer;
-    }
+    private final CommitRetrier commitRetrier;
 
     public CommitterOperator(
+            ProcessingTimeService processingTimeService,
             SimpleVersionedSerializer<InputT> inputSerializer,
             CommitterHandler<InputT, OutputT> committerHandler) {
-        this.inputSerializer = inputSerializer;
-        this.committerHandler = committerHandler;
-        this.outputSerializer = null;
+        this.inputSerializer = checkNotNull(inputSerializer);
+        this.committerHandler = checkNotNull(committerHandler);
+        this.processingTimeService = processingTimeService;
+        this.commitRetrier = new CommitRetrier(processingTimeService, committerHandler);
     }
 
     @Override
@@ -90,24 +82,15 @@ class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]>
 
     @Override
     public void endInput() throws Exception {
-        emitCommittables(committerHandler.endOfInput());
+        committerHandler.endOfInput();
+        commitRetrier.retryIndefinitely();
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        emitCommittables(committerHandler.notifyCheckpointCompleted(checkpointId));
-    }
-
-    private void emitCommittables(Collection<OutputT> committables) throws IOException {
-        if (outputSerializer != null) {
-            for (OutputT committable : committables) {
-                output.collect(
-                        new StreamRecord<>(
-                                SimpleVersionedSerialization.writeVersionAndSerialize(
-                                        outputSerializer, committable)));
-            }
-        }
+        committerHandler.notifyCheckpointCompleted(checkpointId);
+        commitRetrier.retryWithDelay();
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
index bbef434..9ce3cc2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
@@ -74,7 +74,8 @@ public final class CommitterOperatorFactory<CommT, GlobalCommT>
                     !(committerHandler instanceof NoopCommitterHandler),
                     "committer operator without commmitter");
             final CommitterOperator<CommT, GlobalCommT> committerOperator =
-                    new CommitterOperator<>(committableSerializer, committerHandler);
+                    new CommitterOperator<>(
+                            processingTimeService, committableSerializer, committerHandler);
             committerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java
index d8a3ea2..e544611 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.util.function.SupplierWithException;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -26,10 +27,19 @@ import java.util.List;
  * SinkOperator} without committers but with downstream operators (in streaming, only global
  * committer on sink; in batch, committer or global committer present).
  */
-class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, CommT> {
+class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, CommT, CommT> {
+    ForwardCommittingHandler() {}
+
     @Override
     public List<CommT> processCommittables(
             SupplierWithException<List<CommT>, Exception> committableSupplier) throws Exception {
         return committableSupplier.get();
     }
+
+    @Override
+    protected void retry(List<CommT> recoveredCommittables)
+            throws IOException, InterruptedException {
+        throw new UnsupportedOperationException(
+                "This handler should never receive recovered commits");
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java
index 8e00f15..ae137d1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink.GlobalCommitter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -33,7 +36,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <GlobalCommT> The committable type of the {@link GlobalCommitter}
  */
 final class GlobalBatchCommitterHandler<CommT, GlobalCommT>
-        extends AbstractCommitterHandler<CommT, GlobalCommT> {
+        extends AbstractCommitterHandler<CommT, GlobalCommT, GlobalCommT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlobalBatchCommitterHandler.class);
 
     /**
      * Aggregate committables to global committables and commit the global committables to the
@@ -50,18 +55,20 @@ final class GlobalBatchCommitterHandler<CommT, GlobalCommT>
         List<CommT> allCommittables = pollCommittables();
         if (!allCommittables.isEmpty()) {
             final GlobalCommT globalCommittable = globalCommitter.combine(allCommittables);
-            final List<GlobalCommT> neededRetryCommittables =
-                    globalCommitter.commit(Collections.singletonList(globalCommittable));
-            if (!neededRetryCommittables.isEmpty()) {
-                throw new UnsupportedOperationException(
-                        "Currently does not support the re-commit!");
-            }
+            recoveredCommittables(
+                    globalCommitter.commit(Collections.singletonList(globalCommittable)));
         }
         globalCommitter.endOfInput();
         return Collections.emptyList();
     }
 
     @Override
+    protected void retry(List<GlobalCommT> recoveredCommittables)
+            throws IOException, InterruptedException {
+        recoveredCommittables(globalCommitter.commit(recoveredCommittables));
+    }
+
+    @Override
     public void close() throws Exception {
         globalCommitter.close();
         super.close();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java
index 23d979b..1c29aa4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,12 +45,6 @@ public final class GlobalStreamingCommitterHandler<CommT, GlobalCommT>
      */
     private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
 
-    /**
-     * The global committables that might need to be committed again after recovering from a
-     * failover.
-     */
-    private final List<GlobalCommT> recoveredGlobalCommittables;
-
     private boolean endOfInput;
 
     public GlobalStreamingCommitterHandler(
@@ -60,27 +53,21 @@ public final class GlobalStreamingCommitterHandler<CommT, GlobalCommT>
         super(committableSerializer);
         this.globalCommitter = checkNotNull(globalCommitter);
 
-        this.recoveredGlobalCommittables = new ArrayList<>();
         this.endOfInput = false;
     }
 
     @Override
-    void recoveredCommittables(List<GlobalCommT> committables) throws IOException {
-        final List<GlobalCommT> recovered =
-                globalCommitter.filterRecoveredCommittables(checkNotNull(committables));
-        recoveredGlobalCommittables.addAll(recovered);
+    protected void recoveredCommittables(List<GlobalCommT> committables) throws IOException {
+        super.recoveredCommittables(
+                globalCommitter.filterRecoveredCommittables(checkNotNull(committables)));
     }
 
     @Override
     List<GlobalCommT> prepareCommit(List<CommT> input) throws IOException {
-        checkNotNull(input);
-        final List<GlobalCommT> result = new ArrayList<>(recoveredGlobalCommittables);
-        recoveredGlobalCommittables.clear();
-
-        if (!input.isEmpty()) {
-            result.add(globalCommitter.combine(input));
-        }
-        return result;
+        return prependRecoveredCommittables(
+                input.isEmpty()
+                        ? Collections.emptyList()
+                        : Collections.singletonList(globalCommitter.combine(input)));
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java
index fc48cbf..bca5e9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.util.function.SupplierWithException;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
@@ -39,4 +40,12 @@ enum NoopCommitterHandler implements CommitterHandler<Object, Object> {
 
     @Override
     public void close() throws Exception {}
+
+    @Override
+    public boolean needsRetry() {
+        return false;
+    }
+
+    @Override
+    public void retry() throws IOException, InterruptedException {}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
index 4eac9ee..d01434b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
@@ -27,14 +27,17 @@ import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.function.BiFunctionWithException;
 
@@ -80,13 +83,16 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
     // ------------------------------- runtime fields ---------------------------------------
 
     /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
-    private Long currentWatermark;
+    private Long currentWatermark = Long.MIN_VALUE;
 
     private SinkWriter<InputT, CommT, WriterStateT> sinkWriter;
 
     private final SinkWriterStateHandler<WriterStateT> sinkWriterStateHandler;
 
     private final CommitterHandler<CommT, CommT> committerHandler;
+
+    private CommitRetrier commitRetrier;
+
     @Nullable private final SimpleVersionedSerializer<CommT> committableSerializer;
 
     private final BiFunctionWithException<
@@ -118,13 +124,15 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
         this.committerHandler = checkNotNull(committerHandler);
         this.committableSerializer = committableSerializer;
         this.context = new Context<>();
+        this.commitRetrier = new CommitRetrier(processingTimeService, committerHandler);
     }
 
     @Override
-    public void open() throws Exception {
-        super.open();
-
-        this.currentWatermark = Long.MIN_VALUE;
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<byte[]>> output) {
+        super.setup(containingTask, config, output);
         numRecordsOutCounter = getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
     }
 
@@ -165,6 +173,7 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
         emitCommittables(committerHandler.notifyCheckpointCompleted(checkpointId));
+        commitRetrier.retryWithDelay();
     }
 
     @Override
@@ -180,6 +189,7 @@ class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<b
         emitCommittables(
                 committerHandler.processCommittables(() -> sinkWriter.prepareCommit(true)));
         emitCommittables(committerHandler.endOfInput());
+        commitRetrier.retryIndefinitely();
     }
 
     private void emitCommittables(Collection<CommT> committables) throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandler.java
index 062dabb..e59f34d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandler.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.sink.Committer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
@@ -36,9 +35,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 final class StreamingCommitterHandler<CommT>
         extends AbstractStreamingCommitterHandler<CommT, CommT> {
 
-    /** The committables that might need to be committed again after recovering from a failover. */
-    private final List<CommT> recoveredCommittables;
-
     /** Responsible for committing the committable to the external system. * */
     private final Committer<CommT> committer;
 
@@ -46,21 +42,11 @@ final class StreamingCommitterHandler<CommT>
             Committer<CommT> committer, SimpleVersionedSerializer<CommT> committableSerializer) {
         super(committableSerializer);
         this.committer = checkNotNull(committer);
-        this.recoveredCommittables = new ArrayList<>();
-    }
-
-    @Override
-    void recoveredCommittables(List<CommT> committables) {
-        recoveredCommittables.addAll(checkNotNull(committables));
     }
 
     @Override
     List<CommT> prepareCommit(List<CommT> input) {
-        checkNotNull(input);
-        final List<CommT> result = new ArrayList<>(recoveredCommittables);
-        recoveredCommittables.clear();
-        result.addAll(input);
-        return result;
+        return prependRecoveredCommittables(checkNotNull(input));
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 0495514..8229076 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -59,6 +59,10 @@ public class TestProcessingTimeService implements TimerService {
                         });
     }
 
+    public void advance(long delta) throws Exception {
+        setCurrentTime(this.currentTime + delta);
+    }
+
     public void setCurrentTime(long timestamp) throws Exception {
         this.currentTime = timestamp;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandlerTest.java
index 827525b..33b61dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandlerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.sink.Committer;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -43,16 +44,19 @@ public class BatchCommitterHandlerTest extends TestLogger {
         testHarness.initializeEmptyState();
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void doNotSupportRetry() throws Exception {
+    @Test
+    public void supportRetry() throws Exception {
+        final TestSink.RetryOnceCommitter committer = new TestSink.RetryOnceCommitter();
         final OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness =
-                createTestHarness(new TestSink.AlwaysRetryCommitter());
+                createTestHarness(committer);
 
         testHarness.initializeEmptyState();
         testHarness.open();
         testHarness.processElement(committableRecord("those"));
+        testHarness.processElement(committableRecord("these"));
         testHarness.endInput();
         testHarness.close();
+        assertThat(committer.getCommittedData(), Matchers.contains("those", "these"));
     }
 
     @Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrierTest.java
new file mode 100644
index 0000000..2407d09
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class CommitRetrierTest {
+    @Test
+    void testRetry() throws Exception {
+        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+        CommitterHandlerWithRetries committerHandler = new CommitterHandlerWithRetries();
+        CommitRetrier retryer = new CommitRetrier(processingTimeService, committerHandler);
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+
+        committerHandler.addRetries(2);
+        assertThat(committerHandler.needsRetry(), equalTo(true));
+        assertThat(committerHandler.getPendingRetries(), equalTo(2));
+
+        assertThat(retryer.retry(0), equalTo(true));
+        assertThat(committerHandler.getPendingRetries(), equalTo(2));
+
+        assertThat(retryer.retry(1), equalTo(true));
+        assertThat(committerHandler.getPendingRetries(), equalTo(1));
+
+        assertThat(retryer.retry(1), equalTo(false));
+        assertThat(committerHandler.getPendingRetries(), equalTo(0));
+
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+    }
+
+    @Test
+    void testInfiniteRetry() throws Exception {
+        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+        CommitterHandlerWithRetries committerHandler = new CommitterHandlerWithRetries();
+        CommitRetrier retryer = new CommitRetrier(processingTimeService, committerHandler);
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+
+        committerHandler.addRetries(2);
+        assertThat(committerHandler.needsRetry(), equalTo(true));
+        assertThat(committerHandler.getPendingRetries(), equalTo(2));
+
+        retryer.retryIndefinitely();
+        assertThat(committerHandler.getPendingRetries(), equalTo(0));
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+    }
+
+    @Test
+    void testTimedRetry() throws Exception {
+        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+        Clock manualClock = new ManualClock();
+        processingTimeService.setCurrentTime(manualClock.absoluteTimeMillis());
+
+        CommitterHandlerWithRetries committerHandler = new CommitterHandlerWithRetries();
+        CommitRetrier retryer =
+                new CommitRetrier(processingTimeService, committerHandler, manualClock);
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+
+        committerHandler.addRetries(2);
+        retryer.retryWithDelay();
+
+        assertThat(committerHandler.needsRetry(), equalTo(true));
+        assertThat(committerHandler.getPendingRetries(), equalTo(2));
+
+        processingTimeService.advance(CommitRetrier.RETRY_DELAY);
+        assertThat(committerHandler.getPendingRetries(), equalTo(1));
+
+        processingTimeService.advance(CommitRetrier.RETRY_DELAY);
+        assertThat(committerHandler.getPendingRetries(), equalTo(0));
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+
+        processingTimeService.advance(CommitRetrier.RETRY_DELAY);
+        assertThat(committerHandler.getPendingRetries(), equalTo(0));
+        assertThat(committerHandler.needsRetry(), equalTo(false));
+    }
+
+    private static class CommitterHandlerWithRetries extends ForwardCommittingHandler<String> {
+        private AtomicInteger retriesNeeded = new AtomicInteger(0);
+
+        void addRetries(int retries) {
+            retriesNeeded.addAndGet(retries);
+        }
+
+        int getPendingRetries() {
+            return retriesNeeded.get();
+        }
+
+        @Override
+        public boolean needsRetry() {
+            return getPendingRetries() > 0;
+        }
+
+        @Override
+        public void retry() throws IOException, InterruptedException {
+            retriesNeeded.decrementAndGet();
+        }
+    }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
index a3f90b4..30edcaf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -44,16 +45,20 @@ public class GlobalBatchCommitterHandlerTest extends TestLogger {
         testHarness.initializeEmptyState();
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void doNotSupportRetry() throws Exception {
+    @Test
+    public void supportRetry() throws Exception {
+        final TestSink.RetryOnceGlobalCommitter globalCommitter =
+                new TestSink.RetryOnceGlobalCommitter();
         final OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness =
-                createTestHarness(new TestSink.AlwaysRetryGlobalCommitter());
+                createTestHarness(globalCommitter);
 
         testHarness.initializeEmptyState();
         testHarness.open();
         testHarness.processElement(committableRecord("hotel"));
+        testHarness.processElement(committableRecord("motel"));
         testHarness.endInput();
         testHarness.close();
+        assertThat(globalCommitter.getCommittedData(), Matchers.contains("hotel|motel"));
     }
 
     @Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandlerTest.java
index 906b2d1..1dec8f3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandlerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -58,18 +59,46 @@ public class GlobalStreamingCommitterHandlerTest extends TestLogger {
         testHarness.open();
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void doNotSupportRetry() throws Exception {
+    @Test
+    public void supportRetryInNextCommit() throws Exception {
         final List<String> input = Arrays.asList("lazy", "leaf");
+        final TestSink.RetryOnceGlobalCommitter globalCommitter =
+                new TestSink.RetryOnceGlobalCommitter();
+        final OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness =
+                createTestHarness(globalCommitter);
+
+        testHarness.initializeEmptyState();
+        testHarness.open();
+        testHarness.processElements(committableRecords(input));
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(1L);
+        assertThat(globalCommitter.getCommittedData(), Matchers.hasSize(0));
+        // commits delayed by one checkpoint
+        testHarness.snapshot(2L, 2L);
+        testHarness.notifyOfCompletedCheckpoint(2L);
+        assertThat(globalCommitter.getCommittedData(), Matchers.contains("lazy|leaf"));
 
+        testHarness.close();
+    }
+
+    @Test
+    public void supportRetryByTime() throws Exception {
+        final List<String> input = Arrays.asList("lazy", "leaf");
+        final TestSink.RetryOnceGlobalCommitter globalCommitter =
+                new TestSink.RetryOnceGlobalCommitter();
         final OneInputStreamOperatorTestHarness<byte[], byte[]> testHarness =
-                createTestHarness(new TestSink.AlwaysRetryGlobalCommitter());
+                createTestHarness(globalCommitter);
 
         testHarness.initializeEmptyState();
         testHarness.open();
         testHarness.processElements(committableRecords(input));
         testHarness.snapshot(1L, 1L);
         testHarness.notifyOfCompletedCheckpoint(1L);
+        assertThat(globalCommitter.getCommittedData(), Matchers.hasSize(0));
+
+        testHarness.getProcessingTimeService().setCurrentTime(Long.MAX_VALUE);
+
+        assertThat(globalCommitter.getCommittedData(), Matchers.contains("lazy|leaf"));
 
         testHarness.close();
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandlerTest.java
index ed74614..86e2300 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandlerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterHandlerTest.java
@@ -51,11 +51,12 @@ public class StreamingCommitterHandlerTest extends TestLogger {
         testHarness.open();
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void doNotSupportRetry() throws Exception {
+    @Test
+    public void supportRetry() throws Exception {
         final List<String> input = Arrays.asList("lazy", "leaf");
+        final TestSink.RetryOnceCommitter committer = new TestSink.RetryOnceCommitter();
         final OneInputStreamOperatorTestHarness<String, byte[]> testHarness =
-                createTestHarness(new TestSink.AlwaysRetryCommitter());
+                createTestHarness(committer);
 
         testHarness.initializeEmptyState();
         testHarness.open();
@@ -64,8 +65,17 @@ public class StreamingCommitterHandlerTest extends TestLogger {
         testHarness.prepareSnapshotPreBarrier(1);
         testHarness.snapshot(1L, 1L);
         testHarness.notifyOfCompletedCheckpoint(1L);
+        testHarness.snapshot(2L, 2L);
+        testHarness.notifyOfCompletedCheckpoint(2L);
 
         testHarness.close();
+
+        // committedData has the format (<input>, null, <number>)
+        final List<String> committedInputs =
+                committer.getCommittedData().stream()
+                        .map(s -> s.substring(1, s.length() - 1).split(",")[0].trim())
+                        .collect(Collectors.toList());
+        assertThat(committedInputs, Matchers.contains("lazy", "leaf"));
     }
 
     @Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
index ca31a6a..e9f8e9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
@@ -35,9 +35,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -275,7 +277,7 @@ public class TestSink<T> implements Sink<T, String, String, String> {
     /** Base class for testing {@link Committer} and {@link GlobalCommitter}. */
     static class DefaultCommitter implements Committer<String>, Serializable {
 
-        @Nullable private Queue<String> committedData;
+        @Nullable protected Queue<String> committedData;
 
         private boolean isClosed;
 
@@ -321,11 +323,22 @@ public class TestSink<T> implements Sink<T, String, String, String> {
     }
 
     /** A {@link Committer} that always re-commits the committables data it received. */
-    static class AlwaysRetryCommitter extends DefaultCommitter implements Committer<String> {
+    static class RetryOnceCommitter extends DefaultCommitter implements Committer<String> {
+
+        private final Set<String> seen = new LinkedHashSet<>();
 
         @Override
         public List<String> commit(List<String> committables) {
-            return committables;
+            committables.forEach(
+                    c -> {
+                        if (seen.remove(c)) {
+                            checkNotNull(committedData);
+                            committedData.add(c);
+                        } else {
+                            seen.add(c);
+                        }
+                    });
+            return new ArrayList<>(seen);
         }
     }
 
@@ -380,11 +393,13 @@ public class TestSink<T> implements Sink<T, String, String, String> {
     }
 
     /** A {@link GlobalCommitter} that always re-commits global committables it received. */
-    static class AlwaysRetryGlobalCommitter extends DefaultGlobalCommitter {
+    static class RetryOnceGlobalCommitter extends DefaultGlobalCommitter {
+
+        private final Set<String> seen = new LinkedHashSet<>();
 
         @Override
         public List<String> filterRecoveredCommittables(List<String> globalCommittables) {
-            return Collections.emptyList();
+            return globalCommittables;
         }
 
         @Override
@@ -397,7 +412,16 @@ public class TestSink<T> implements Sink<T, String, String, String> {
 
         @Override
         public List<String> commit(List<String> committables) {
-            return committables;
+            committables.forEach(
+                    c -> {
+                        if (seen.remove(c)) {
+                            checkNotNull(committedData);
+                            committedData.add(c);
+                        } else {
+                            seen.add(c);
+                        }
+                    });
+            return new ArrayList<>(seen);
         }
     }
 

Mime
View raw message