flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dwysakow...@apache.org
Subject [flink] 02/03: [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close and finish
Date Tue, 06 Jul 2021 06:23:13 GMT
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 541f43026203b56c0501a33ed54271270ac3e085
Author: Dawid Wysakowicz <dwysakowicz@apache.org>
AuthorDate: Fri Jun 25 15:32:56 2021 +0200

    [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close and finish
    
    This commit cleans up StreamOperator API in regards to the termination phase and introduces a clean finish() method for flushing all records without releasing resources.
    
    The StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method.
    
    This closes #16351
---
 .../flink/state/api/output/BoundedStreamTask.java  |   2 +-
 .../operators/StateBootstrapWrapperOperator.java   |   8 +-
 .../flink/state/api/output/SnapshotUtilsTest.java  |   8 +-
 .../python/AbstractPythonFunctionOperator.java     |  20 ++--
 .../PythonTimestampsAndWatermarksOperator.java     |   4 +-
 ...stractArrowPythonAggregateFunctionOperator.java |   4 +-
 ...tBatchArrowPythonAggregateFunctionOperator.java |   4 +-
 .../RowDataArrowPythonScalarFunctionOperator.java  |  14 +--
 .../source/ContinuousFileReaderOperator.java       | 104 ++++++++++-----------
 .../api/operators/AbstractStreamOperator.java      |  23 +----
 .../api/operators/AbstractStreamOperatorV2.java    |  23 +----
 .../api/operators/AbstractUdfStreamOperator.java   |  13 ---
 .../streaming/api/operators/SourceOperator.java    |  18 +---
 .../streaming/api/operators/StreamOperator.java    |  26 +++---
 .../streaming/api/operators/StreamSource.java      |  22 +++--
 .../api/operators/collect/CollectSinkOperator.java |   4 +-
 .../runtime/operators/GenericWriteAheadSink.java   |   1 +
 .../operators/TimestampsAndWatermarksOperator.java |   4 +-
 .../windowing/EvictingWindowOperator.java          |   6 --
 .../operators/windowing/WindowOperator.java        |   9 --
 .../streaming/runtime/tasks/OperatorChain.java     |   6 +-
 .../runtime/tasks/StreamOperatorWrapper.java       |  60 ++++++------
 .../flink/streaming/runtime/tasks/StreamTask.java  |  33 ++++---
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  23 ++---
 .../api/operators/SourceOperatorTest.java          |  12 ---
 .../ContinuousFileProcessingRescalingTest.java     |   2 +-
 .../StreamSourceOperatorLatencyMetricsTest.java    |   2 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  43 ++-------
 .../runtime/tasks/OneInputStreamTaskTest.java      |   8 +-
 .../runtime/tasks/SourceStreamTaskTest.java        |  15 ++-
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  26 +++---
 .../streaming/runtime/tasks/StreamTaskTest.java    |  48 +++++-----
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   4 +-
 .../tasks/TestBoundedOneInputStreamOperator.java   |  10 +-
 .../runtime/tasks/TestBoundedTwoInputOperator.java |  11 ++-
 .../runtime/tasks/TwoInputStreamTaskTest.java      |   6 +-
 .../util/AbstractStreamOperatorTestHarness.java    |   6 +-
 .../util/TestBoundedMultipleInputOperator.java     |  74 ---------------
 .../filesystem/stream/AbstractStreamingWriter.java |   4 +-
 .../runtime/operators/TableStreamOperator.java     |  16 ----
 .../window/LocalSlicingWindowAggOperator.java      |  15 ---
 .../bundle/AbstractMapBundleOperator.java          |  38 ++++----
 .../temporal/TemporalProcessTimeJoinOperator.java  |   1 +
 .../join/temporal/TemporalRowTimeJoinOperator.java |   1 +
 .../operators/join/window/WindowJoinOperator.java  |  17 ----
 .../MultipleInputStreamOperatorBase.java           |  18 ++--
 .../runtime/operators/sort/StreamSortOperator.java |   6 +-
 .../runtime/operators/window/WindowOperator.java   |  23 -----
 .../window/slicing/SlicingWindowOperator.java      |  15 ---
 .../ProcTimeMiniBatchAssignerOperator.java         |   5 -
 .../RowTimeMiniBatchAssginerOperator.java          |   4 +-
 .../wmassigners/WatermarkAssignerOperator.java     |  17 +---
 .../BatchMultipleInputStreamOperatorTest.java      |  31 ------
 .../TestingOneInputStreamOperator.java             |  10 --
 .../TestingTwoInputStreamOperator.java             |  10 --
 .../test/streaming/runtime/TimestampITCase.java    |   4 +-
 56 files changed, 314 insertions(+), 627 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index ee30d4f..257bc70 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -106,6 +106,7 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo
             mainOperator.processElement(streamRecord);
         } else {
             mainOperator.endInput();
+            mainOperator.finish();
             controller.suspendDefaultAction();
             mailboxProcessor.suspend();
         }
@@ -117,7 +118,6 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo
     @Override
     protected void cleanup() throws Exception {
         mainOperator.close();
-        mainOperator.dispose();
     }
 
     private static class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index 245a7d4..92eb6d3 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -105,13 +105,13 @@ public final class StateBootstrapWrapperOperator<
     }
 
     @Override
-    public void close() throws Exception {
-        operator.close();
+    public void finish() throws Exception {
+        operator.finish();
     }
 
     @Override
-    public void dispose() throws Exception {
-        operator.dispose();
+    public void close() throws Exception {
+        operator.close();
     }
 
     @Override
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index 1178654..8430888 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -74,13 +74,13 @@ public class SnapshotUtilsTest {
         }
 
         @Override
-        public void close() throws Exception {
-            ACTUAL_ORDER_TRACKING.add("close");
+        public void finish() throws Exception {
+            ACTUAL_ORDER_TRACKING.add("finish");
         }
 
         @Override
-        public void dispose() throws Exception {
-            ACTUAL_ORDER_TRACKING.add("dispose");
+        public void close() throws Exception {
+            ACTUAL_ORDER_TRACKING.add("close");
         }
 
         @Override
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 387c14f..7abb6c1 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -139,22 +139,16 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         try {
             invokeFinishBundle();
         } finally {
-            super.close();
-
-            try {
-                cleanUpLeakingClasses(this.getClass().getClassLoader());
-            } catch (Throwable t) {
-                LOG.warn("Failed to clean up the leaking objects.", t);
-            }
+            super.finish();
         }
     }
 
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         try {
             if (checkFinishBundleTimer != null) {
                 checkFinishBundleTimer.cancel(true);
@@ -165,7 +159,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
                 pythonFunctionRunner = null;
             }
         } finally {
-            super.dispose();
+            super.close();
+
+            try {
+                cleanUpLeakingClasses(this.getClass().getClassLoader());
+            } catch (Throwable t) {
+                LOG.warn("Failed to clean up the leaking objects.", t);
+            }
         }
     }
 
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
index 996aa74..57aea3d 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java
@@ -181,8 +181,8 @@ public class PythonTimestampsAndWatermarksOperator<IN>
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
         watermarkGenerator.onPeriodicEmit(watermarkOutput);
     }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
index b6fe695..a7c63cf 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
@@ -107,8 +107,8 @@ public abstract class AbstractArrowPythonAggregateFunctionOperator
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         if (arrowSerializer != null) {
             arrowSerializer.close();
             arrowSerializer = null;
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
index ecbf2a9..4442cbd 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java
@@ -100,9 +100,9 @@ abstract class AbstractBatchArrowPythonAggregateFunctionOperator
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         invokeCurrentBatch();
-        super.close();
+        super.finish();
     }
 
     protected abstract void invokeCurrentBatch() throws Exception;
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
index b5193b5..cc2eca4 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
@@ -87,18 +87,18 @@ public class RowDataArrowPythonScalarFunctionOperator
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (arrowSerializer != null) {
-            arrowSerializer.close();
-            arrowSerializer = null;
-        }
+    public void finish() throws Exception {
+        invokeCurrentBatch();
+        super.finish();
     }
 
     @Override
     public void close() throws Exception {
-        invokeCurrentBatch();
         super.close();
+        if (arrowSerializer != null) {
+            arrowSerializer.close();
+            arrowSerializer = null;
+        }
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 0df0be2..5c29092 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -82,8 +82,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <ol>
  *   <li>if {@link ReaderState#IDLE IDLE} then close immediately
- *   <li>otherwise switch to {@link ReaderState#CLOSING CLOSING}, call {@link
- *       MailboxExecutor#yield() yield} in a loop until state is {@link ReaderState#CLOSED CLOSED}
+ *   <li>otherwise switch to {@link ReaderState#FINISHING CLOSING}, call {@link
+ *       MailboxExecutor#yield() yield} in a loop until state is {@link ReaderState#FINISHED CLOSED}
  *   <li>{@link MailboxExecutor#yield() yield()} causes remaining records (and splits) to be
  *       processed in the same way as above
  * </ol>
@@ -139,7 +139,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
             }
         },
         /**
-         * No further processing can be done; only state disposal transition to {@link #CLOSED}
+         * No further processing can be done; only state disposal transition to {@link #FINISHED}
          * allowed.
          */
         FAILED {
@@ -153,7 +153,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
          * {@link #close()} was called but unprocessed data (records and splits) remains and needs
          * to be processed. {@link #close()} caller is blocked.
          */
-        CLOSING {
+        FINISHING {
             @Override
             public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(
                     ContinuousFileReaderOperator<?, T> op) throws IOException {
@@ -168,10 +168,10 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
                 // need one more mail to unblock possible yield() in close() method (todo: wait with
                 // timeout in yield)
                 op.enqueueProcessRecord();
-                op.switchState(CLOSED);
+                op.switchState(FINISHED);
             }
         },
-        CLOSED {
+        FINISHED {
             @Override
             public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(
                     ContinuousFileReaderOperator<?, T> op) {
@@ -186,12 +186,12 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
         static {
             Map<ReaderState, Set<ReaderState>> tmpTransitions = new HashMap<>();
-            tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED, FAILED));
-            tmpTransitions.put(OPENING, EnumSet.of(READING, CLOSING, FAILED));
-            tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, CLOSING, FAILED));
-            tmpTransitions.put(CLOSING, EnumSet.of(CLOSED, FAILED));
-            tmpTransitions.put(FAILED, EnumSet.of(CLOSED));
-            tmpTransitions.put(CLOSED, EnumSet.noneOf(ReaderState.class));
+            tmpTransitions.put(IDLE, EnumSet.of(OPENING, FINISHED, FAILED));
+            tmpTransitions.put(OPENING, EnumSet.of(READING, FINISHING, FAILED));
+            tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, FINISHING, FAILED));
+            tmpTransitions.put(FINISHING, EnumSet.of(FINISHED, FAILED));
+            tmpTransitions.put(FAILED, EnumSet.of(FINISHED));
+            tmpTransitions.put(FINISHED, EnumSet.noneOf(ReaderState.class));
             VALID_TRANSITIONS = new EnumMap<>(tmpTransitions);
         }
 
@@ -200,7 +200,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
         }
 
         public final boolean isTerminal() {
-            return this == CLOSED;
+            return this == FINISHED;
         }
 
         public boolean canSwitchTo(ReaderState next) {
@@ -302,7 +302,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
         this.state = ReaderState.IDLE;
         if (this.format instanceof RichInputFormat) {
-            ((RichInputFormat) this.format).setRuntimeContext(getRuntimeContext());
+            ((RichInputFormat<?, ?>) this.format).setRuntimeContext(getRuntimeContext());
         }
         this.format.configure(new Configuration());
 
@@ -380,7 +380,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
     private void readAndCollectRecord() throws IOException {
         Preconditions.checkState(
-                state == ReaderState.READING || state == ReaderState.CLOSING,
+                state == ReaderState.READING || state == ReaderState.FINISHING,
                 "can't process record in state %s",
                 state);
         if (format.reachedEnd()) {
@@ -394,14 +394,14 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
     private void loadSplit(T split) throws IOException {
         Preconditions.checkState(
-                state != ReaderState.READING && state != ReaderState.CLOSED,
+                state != ReaderState.READING && state != ReaderState.FINISHED,
                 "can't load split in state %s",
                 state);
         Preconditions.checkNotNull(split, "split is null");
         LOG.debug("load split: {}", split);
         currentSplit = split;
         if (this.format instanceof RichInputFormat) {
-            ((RichInputFormat) this.format).openInputFormat();
+            ((RichInputFormat<?, ?>) this.format).openInputFormat();
         }
         if (format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
             // recovering after a node failure with an input
@@ -436,14 +436,38 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
     }
 
     @Override
-    public void dispose() throws Exception {
+    public void finish() throws Exception {
+        LOG.debug("finishing");
+        super.finish();
+
+        switch (state) {
+            case IDLE:
+                switchState(ReaderState.FINISHED);
+                break;
+            case FINISHED:
+                LOG.warn("operator is already closed, doing nothing");
+                return;
+            default:
+                switchState(ReaderState.FINISHING);
+                while (!state.isTerminal()) {
+                    executor.yield();
+                }
+        }
+
+        try {
+            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+        } catch (Exception e) {
+            LOG.warn("unable to emit watermark while closing", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
         Exception e = null;
-        if (state != ReaderState.CLOSED) {
-            try {
-                cleanUp();
-            } catch (Exception ex) {
-                e = ex;
-            }
+        try {
+            cleanUp();
+        } catch (Exception ex) {
+            e = ex;
         }
         {
             checkpointedState = null;
@@ -457,7 +481,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
             splits = null;
         }
         try {
-            super.dispose();
+            super.close();
         } catch (Exception ex) {
             e = ExceptionUtils.firstOrSuppressed(ex, e);
         }
@@ -466,34 +490,6 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
         }
     }
 
-    @Override
-    public void close() throws Exception {
-        LOG.debug("closing");
-        super.close();
-
-        switch (state) {
-            case IDLE:
-                switchState(ReaderState.CLOSED);
-                break;
-            case CLOSED:
-                LOG.warn("operator is already closed, doing nothing");
-                return;
-            default:
-                switchState(ReaderState.CLOSING);
-                while (!state.isTerminal()) {
-                    executor.yield();
-                }
-        }
-
-        try {
-            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
-        } catch (Exception e) {
-            LOG.warn("unable to emit watermark while closing", e);
-        }
-
-        cleanUp();
-    }
-
     private void cleanUp() throws Exception {
         LOG.debug("cleanup, state={}", state);
 
@@ -502,7 +498,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
             () -> format.close(),
             () -> {
                 if (this.format instanceof RichInputFormat) {
-                    ((RichInputFormat) this.format).closeInputFormat();
+                    ((RichInputFormat<?, ?>) this.format).closeInputFormat();
                 }
             }
         };
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 23da8de..15faaf7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -322,30 +322,11 @@ public abstract class AbstractStreamOperator<OUT>
     @Override
     public void open() throws Exception {}
 
-    /**
-     * This method is called after all records have been added to the operators via the methods
-     * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link
-     * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link
-     * TwoInputStreamOperator#processElement2(StreamRecord)}.
-     *
-     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
-     * flushing of buffered should be propagated, in order to cause the operation to be recognized
-     * asa failed, because the last data items are not processed properly.
-     *
-     * @throws Exception An exception in this method causes the operator to fail.
-     */
     @Override
-    public void close() throws Exception {}
+    public void finish() throws Exception {}
 
-    /**
-     * This method is called at the very end of the operator's life, both in the case of a
-     * successful completion of the operation, and in the case of a failure and canceling.
-     *
-     * <p>This method is expected to make a thorough effort to release all resources that the
-     * operator has acquired.
-     */
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         if (stateHandler != null) {
             stateHandler.dispose();
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 702b753..28b2c44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -266,30 +266,11 @@ public abstract class AbstractStreamOperatorV2<OUT>
     @Override
     public void open() throws Exception {}
 
-    /**
-     * This method is called after all records have been added to the operators via the methods
-     * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link
-     * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link
-     * TwoInputStreamOperator#processElement2(StreamRecord)}.
-     *
-     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
-     * flushing of buffered should be propagated, in order to cause the operation to be recognized
-     * asa failed, because the last data items are not processed properly.
-     *
-     * @throws Exception An exception in this method causes the operator to fail.
-     */
     @Override
-    public void close() throws Exception {}
+    public void finish() throws Exception {}
 
-    /**
-     * This method is called at the very end of the operator's life, both in the case of a
-     * successful completion of the operation, and in the case of a failure and canceling.
-     *
-     * <p>This method is expected to make a thorough effort to release all resources that the
-     * operator has acquired.
-     */
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         if (stateHandler != null) {
             stateHandler.dispose();
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1ece8e7..d41eb95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -53,9 +53,6 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
     /** The user function. */
     protected final F userFunction;
 
-    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
-    private transient boolean functionsClosed = false;
-
     public AbstractUdfStreamOperator(F userFunction) {
         this.userFunction = requireNonNull(userFunction);
         checkUdfCheckpointingPreconditions();
@@ -105,19 +102,9 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
     @Override
     public void close() throws Exception {
         super.close();
-        functionsClosed = true;
         FunctionUtils.closeFunction(userFunction);
     }
 
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (!functionsClosed) {
-            functionsClosed = true;
-            FunctionUtils.closeFunction(userFunction);
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  checkpointing and recovery
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 835ad5a..cc2fc60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -127,9 +127,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
      */
     private TimestampsAndWatermarks<OUT> eventTimeLogic;
 
-    /** Indicating whether the source operator has been closed. */
-    private boolean closed;
-
     public SourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
                     readerFactory,
@@ -263,24 +260,19 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         if (eventTimeLogic != null) {
             eventTimeLogic.stopPeriodicWatermarkEmits();
         }
-        if (sourceReader != null) {
-            sourceReader.close();
-        }
-        closed = true;
-        super.close();
+        super.finish();
     }
 
     @Override
-    public void dispose() throws Exception {
-        // We also need to close the source reader to make sure the resources
-        // are released if the task does not finish normally.
-        if (!closed && sourceReader != null) {
+    public void close() throws Exception {
+        if (sourceReader != null) {
             sourceReader.close();
         }
+        super.close();
     }
 
     @Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index c4a90be..7c522fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Disposable;
 
 import java.io.Serializable;
 
@@ -44,8 +43,7 @@ import java.io.Serializable;
  * @param <OUT> The output type of the operator
  */
 @PublicEvolving
-public interface StreamOperator<OUT>
-        extends CheckpointListener, KeyContext, Disposable, Serializable {
+public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
 
     // ------------------------------------------------------------------------
     //  life cycle
@@ -64,21 +62,21 @@ public interface StreamOperator<OUT>
     void open() throws Exception;
 
     /**
-     * This method is called after all records have been added to the operators via the methods
-     * {@link
-     * org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)},
-     * or {@link
-     * org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)}
-     * and {@link
-     * org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
+     * This method is called at the end of data processing.
      *
      * <p>The method is expected to flush all remaining buffered data. Exceptions during this
      * flushing of buffered should be propagated, in order to cause the operation to be recognized
      * as failed, because the last data items are not processed properly.
      *
+     * <p><b>After this method is called, no more records can be produced for the downstream
+     * operators.</b>
+     *
+     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
+     * resources in the {@link #close()} method.
+     *
      * @throws java.lang.Exception An exception in this method causes the operator to fail.
      */
-    void close() throws Exception;
+    void finish() throws Exception;
 
     /**
      * This method is called at the very end of the operator's life, both in the case of a
@@ -86,9 +84,11 @@ public interface StreamOperator<OUT>
      *
      * <p>This method is expected to make a thorough effort to release all resources that the
      * operator has acquired.
+     *
+     * <p><b>NOTE:</b>It can not emit any records! If you need to emit records at the end of
+     * processing, do so in the {@link #finish()} method.
      */
-    @Override
-    void dispose() throws Exception;
+    void close() throws Exception;
 
     // ------------------------------------------------------------------------
     //  state snapshots
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 4957049..fef7f28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -132,18 +132,20 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
     }
 
     @Override
+    public void finish() throws Exception {
+        super.finish();
+        if (!isCanceledOrStopped() && ctx != null) {
+            advanceToEndOfEventTime();
+        }
+    }
+
+    @Override
     public void close() throws Exception {
-        try {
-            super.close();
-            if (!isCanceledOrStopped() && ctx != null) {
-                advanceToEndOfEventTime();
-            }
-        } finally {
-            // make sure that the context is closed in any case
-            if (ctx != null) {
-                ctx.close();
-            }
+        // make sure that the context is closed in any case
+        if (ctx != null) {
+            ctx.close();
         }
+        super.close();
     }
 
     public void cancel() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
index 5c84c9b..8b78bc3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
@@ -51,9 +51,9 @@ public class CollectSinkOperator<IN> extends StreamSink<IN> implements OperatorE
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         sinkFunction.accumulateFinalResults();
-        super.close();
+        super.finish();
     }
 
     public CompletableFuture<OperatorID> getOperatorIdFuture() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4d007bb..4627c7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -137,6 +137,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 
     public void close() throws Exception {
         committer.close();
+        super.close();
     }
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index b2d3501..2e4a7be 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -127,8 +127,8 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
         watermarkGenerator.onPeriodicEmit(wmOutput);
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24fbd31..16e7aac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -485,12 +485,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
         evictorContext = null;
     }
 
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        evictorContext = null;
-    }
-
     // ------------------------------------------------------------------------
     // Getters for testing
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index f3f30a4..57bbfd1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -288,15 +288,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        timestampedCollector = null;
-        triggerContext = null;
-        processContext = null;
-        windowAssignerContext = null;
-    }
-
-    @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
         final Collection<W> elementWindows =
                 windowAssigner.assignWindows(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 5bfef4e..d1a35b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -406,7 +406,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
     /**
      * Initialize state and open all operators in the chain from <b>tail to heads</b>, contrary to
      * {@link StreamOperator#close()} which happens <b>heads to tail</b> (see {@link
-     * #closeOperators(StreamTaskActionExecutor)}).
+     * #finishOperators(StreamTaskActionExecutor)}).
      */
     protected void initializeStateAndOpenOperators(
             StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
@@ -422,9 +422,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
      * operator in the chain, contrary to {@link StreamOperator#open()} which happens <b>tail to
      * heads</b> (see {@link #initializeStateAndOpenOperators(StreamTaskStateInitializer)}).
      */
-    protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
+    protected void finishOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
         if (firstOperatorWrapper != null) {
-            firstOperatorWrapper.close(actionExecutor, ignoreEndOfInput);
+            firstOperatorWrapper.finish(actionExecutor, ignoreEndOfInput);
         }
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
index 401bc89..1b29fbc 100755
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
@@ -39,7 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This class handles the close, endInput and other related logic of a {@link StreamOperator}. It
  * also automatically propagates the close operation to the next wrapper that the {@link #next}
  * points to, so we can use {@link #next} to link all operator wrappers in the operator chain and
- * close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean)} method
+ * close all operators only by calling the {@link #finish(StreamTaskActionExecutor, boolean)} method
  * of the header operator wrapper.
  */
 @Internal
@@ -120,7 +120,7 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
      * MailboxExecutor#yield()} to take the mails of closing operator and running timers and run
      * them.
      */
-    public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
+    public void finish(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
             throws Exception {
         if (!isHead && !isStoppingBySyncSavepoint) {
             // NOTE: This only do for the case where the operator is one-input operator. At present,
@@ -128,68 +128,63 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
             actionExecutor.runThrowing(() -> endOperatorInput(1));
         }
 
-        quiesceTimeServiceAndCloseOperator(actionExecutor);
+        quiesceTimeServiceAndFinishOperator(actionExecutor);
 
         // propagate the close operation to the next wrapper
         if (next != null) {
-            next.close(actionExecutor, isStoppingBySyncSavepoint);
+            next.finish(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
-    private void quiesceTimeServiceAndCloseOperator(StreamTaskActionExecutor actionExecutor)
+    private void quiesceTimeServiceAndFinishOperator(StreamTaskActionExecutor actionExecutor)
             throws InterruptedException, ExecutionException {
 
         // step 1. to ensure that there is no longer output triggered by the timers before invoking
-        // the "close()"
-        //         method of the operator, we quiesce the processing time service to prevent the
-        // pending timers
-        //         from firing, but wait the timers in running to finish
-        // step 2. invoke the "close()" method of the operator. executing the close operation must
-        // be deferred
-        //         to the mailbox to ensure that mails already in the mailbox are finished before
-        // closing the
-        //         operator
+        // the "finish()" method of the operator, we quiesce the processing time service to prevent
+        // the pending timers from firing, but wait the timers in running to finish
+        // step 2. invoke the "finish()" method of the operator. executing the close operation must
+        // be deferred to the mailbox to ensure that mails already in the mailbox are finished
+        // before closing the operator
         // step 3. send a closed mail to ensure that the mails that are from the operator and still
-        // in the mailbox
-        //         are completed before exiting the following mailbox processing loop
-        CompletableFuture<Void> closedFuture =
+        // in the mailbox are completed before exiting the following mailbox processing loop
+        CompletableFuture<Void> finishedFuture =
                 quiesceProcessingTimeService()
-                        .thenCompose(unused -> deferCloseOperatorToMailbox(actionExecutor))
-                        .thenCompose(unused -> sendClosedMail());
+                        .thenCompose(unused -> deferFinishOperatorToMailbox(actionExecutor))
+                        .thenCompose(unused -> sendFinishedMail());
 
         // run the mailbox processing loop until all operations are finished
-        while (!closedFuture.isDone()) {
+        while (!finishedFuture.isDone()) {
             while (mailboxExecutor.tryYield()) {}
 
             // we wait a little bit to avoid unnecessary CPU occupation due to empty loops,
             // such as when all mails of the operator have been processed but the closed future
             // has not been set to completed state
             try {
-                closedFuture.get(1, TimeUnit.MILLISECONDS);
+                finishedFuture.get(1, TimeUnit.MILLISECONDS);
             } catch (TimeoutException ex) {
                 // do nothing
             }
         }
 
         // expose the exception thrown when closing
-        closedFuture.get();
+        finishedFuture.get();
     }
 
-    private CompletableFuture<Void> deferCloseOperatorToMailbox(
+    private CompletableFuture<Void> deferFinishOperatorToMailbox(
             StreamTaskActionExecutor actionExecutor) {
-        final CompletableFuture<Void> closeOperatorFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> finishOperatorFuture = new CompletableFuture<>();
 
         mailboxExecutor.execute(
                 () -> {
                     try {
-                        closeOperator(actionExecutor);
-                        closeOperatorFuture.complete(null);
+                        finishOperator(actionExecutor);
+                        finishOperatorFuture.complete(null);
                     } catch (Throwable t) {
-                        closeOperatorFuture.completeExceptionally(t);
+                        finishOperatorFuture.completeExceptionally(t);
                     }
                 },
                 "StreamOperatorWrapper#closeOperator for " + wrapped);
-        return closeOperatorFuture;
+        return finishOperatorFuture;
     }
 
     private CompletableFuture<Void> quiesceProcessingTimeService() {
@@ -198,19 +193,20 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
                 .orElse(CompletableFuture.completedFuture(null));
     }
 
-    private CompletableFuture<Void> sendClosedMail() {
+    private CompletableFuture<Void> sendFinishedMail() {
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
         mailboxExecutor.execute(
-                () -> future.complete(null), "StreamOperatorWrapper#sendClosedMail for " + wrapped);
+                () -> future.complete(null),
+                "StreamOperatorWrapper#sendFinishedMail for " + wrapped);
         return future;
     }
 
-    private void closeOperator(StreamTaskActionExecutor actionExecutor) throws Exception {
+    private void finishOperator(StreamTaskActionExecutor actionExecutor) throws Exception {
         actionExecutor.runThrowing(
                 () -> {
                     closed = true;
-                    wrapped.close();
+                    wrapped.finish();
                 });
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a0326981..8ff3446 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -236,7 +236,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
      */
     private volatile boolean failing;
 
-    private boolean disposedOperators;
+    private boolean closedOperators;
 
     /** Thread pool for async snapshot workers. */
     private final ExecutorService asyncOperationsThreadPool;
@@ -506,7 +506,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
     /**
      * Instructs the task to go through its normal termination routine, i.e. exit the run-loop and
-     * call {@link StreamOperator#close()} and {@link StreamOperator#dispose()} on its operators.
+     * call {@link StreamOperator#finish()} and {@link StreamOperator#close()} on its operators.
      *
      * <p>This is used by the source task to get out of the run-loop when the job is stopped with a
      * savepoint.
@@ -552,7 +552,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
             LOG.debug("Re-restore attempt rejected.");
             return;
         }
-        disposedOperators = false;
+        closedOperators = false;
         LOG.debug("Initializing {}.", getName());
 
         operatorChain = new OperatorChain<>(this, recordWriter);
@@ -698,7 +698,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
 
         // close all operators in a chain effect way
-        operatorChain.closeOperators(actionExecutor);
+        operatorChain.finishOperators(actionExecutor);
 
         // If checkpoints are enabled, waits for all the records get processed by the downstream
         // tasks. During this process, this task could coordinate with its downstream tasks to
@@ -758,7 +758,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
         // make an attempt to dispose the operators such that failures in the dispose call
         // still let the computation fail
-        disposeAllOperators();
+        closeAllOperators();
     }
 
     protected void cleanUpInvoke() throws Exception {
@@ -789,8 +789,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         suppressedException = runAndSuppressThrowable(this::cleanup, suppressedException);
 
         // if the operators were not disposed before, do a hard dispose
-        suppressedException =
-                runAndSuppressThrowable(this::disposeAllOperators, suppressedException);
+        suppressedException = runAndSuppressThrowable(this::closeAllOperators, suppressedException);
 
         // release the output resources. this method should never fail.
         suppressedException =
@@ -888,24 +887,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     }
 
     /**
-     * Execute @link StreamOperator#dispose()} of each operator in the chain of this {@link
-     * StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
+     * Execute {@link StreamOperator#close()} of each operator in the chain of this {@link
+     * StreamTask}. Closing happens from <b>tail to head</b> operator in the chain.
      */
-    private void disposeAllOperators() throws Exception {
-        if (operatorChain != null && !disposedOperators) {
-            Exception disposalException = null;
+    private void closeAllOperators() throws Exception {
+        if (operatorChain != null && !closedOperators) {
+            Exception closingException = null;
             for (StreamOperatorWrapper<?, ?> operatorWrapper :
                     operatorChain.getAllOperators(true)) {
                 StreamOperator<?> operator = operatorWrapper.getStreamOperator();
                 try {
-                    operator.dispose();
+                    operator.close();
                 } catch (Exception e) {
-                    disposalException = firstOrSuppressed(e, disposalException);
+                    closingException = firstOrSuppressed(e, closingException);
                 }
             }
-            disposedOperators = true;
-            if (disposalException != null) {
-                throw disposalException;
+            closedOperators = true;
+            if (closingException != null) {
+                throw closingException;
             }
         }
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 8697353..6daaee8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -67,9 +67,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                     "UDF::run",
                     "OPERATOR::prepareSnapshotPreBarrier",
                     "OPERATOR::snapshotState",
+                    "OPERATOR::finish",
                     "OPERATOR::close",
-                    "UDF::close",
-                    "OPERATOR::dispose");
+                    "UDF::close");
 
     private static final List<String> EXPECTED_CALL_ORDER_CANCEL_RUNNING =
             Arrays.asList(
@@ -82,13 +82,13 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                     "UDF::run",
                     "OPERATOR::cancel",
                     "UDF::cancel",
-                    "OPERATOR::dispose",
+                    "OPERATOR::close",
                     "UDF::close");
 
     private static final String ALL_METHODS_STREAM_OPERATOR =
             "["
                     + "close[], "
-                    + "dispose[], "
+                    + "finish[], "
                     + "getCurrentKey[], "
                     + "getMetricGroup[], "
                     + "getOperatorID[], "
@@ -320,6 +320,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
         }
 
         @Override
+        public void finish() throws Exception {
+            ACTUAL_ORDER_TRACKING.add("OPERATOR::finish");
+            super.finish();
+        }
+
+        @Override
         public void close() throws Exception {
             ACTUAL_ORDER_TRACKING.add("OPERATOR::close");
             super.close();
@@ -330,14 +336,5 @@ public class AbstractUdfStreamOperatorLifecycleTest {
             ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel");
             super.cancel();
         }
-
-        @Override
-        public void dispose() throws Exception {
-            ACTUAL_ORDER_TRACKING.add("OPERATOR::dispose");
-            super.dispose();
-            if (simulateCheckpointing) {
-                testCheckpointer.join();
-            }
-        }
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 7c835f0..a518c33 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -95,7 +95,6 @@ public class SourceOperatorTest {
     @After
     public void cleanUp() throws Exception {
         operator.close();
-        operator.dispose();
         assertTrue(mockSourceReader.isClosed());
     }
 
@@ -187,17 +186,6 @@ public class SourceOperatorTest {
         assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0));
     }
 
-    @Test
-    public void testDisposeAfterCloseOnlyClosesReaderOnce() throws Exception {
-        // Initialize the operator.
-        operator.initializeState(getStateContext());
-        // Open the operator.
-        operator.open();
-        operator.close();
-        operator.dispose();
-        assertEquals(1, mockSourceReader.getTimesClosed());
-    }
-
     // ---------------- helper methods -------------------------
 
     private StateInitializationContext getStateContext() throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
index 64405d4..70a8bb7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
@@ -294,7 +294,7 @@ public class ContinuousFileProcessingRescalingTest {
             while (!getFormat().isLastProcessed()) {
                 mailboxProcessor.runMailboxStep();
             }
-            getHarness().close();
+            harness.getOperator().finish();
         }
 
         @Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index 597d7fb..3e80b1a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -190,7 +190,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends TestLogger {
                                 new MockEnvironmentBuilder().build()));
         try {
             operator.run(new Object(), new CollectorOutput<>(output), operatorChain);
-            operator.close();
+            operator.finish();
         } finally {
             operatorChain.close();
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 71be94f..5be8ae25 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -73,7 +73,6 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator;
-import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.SerializedValue;
 
@@ -473,10 +472,11 @@ public class MultipleInputStreamTaskTest {
                         LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT,
                         LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT,
                         LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT,
-                        LifeCycleTrackingMockSourceReader.CLOSE,
-                        LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE,
+                        LifeCycleTrackingMapToStringMultipleInputOperator.FINISH,
                         LifeCycleTrackingMap.END_INPUT,
-                        LifeCycleTrackingMap.CLOSE));
+                        LifeCycleTrackingMap.CLOSE,
+                        LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE,
+                        LifeCycleTrackingMockSourceReader.CLOSE));
     }
 
     @Test
@@ -999,35 +999,6 @@ public class MultipleInputStreamTaskTest {
         }
     }
 
-    private static class TestBoundedMultipleInputOperatorFactory
-            extends AbstractStreamOperatorFactory<String> {
-        @Override
-        public <T extends StreamOperator<String>> T createStreamOperator(
-                StreamOperatorParameters<String> parameters) {
-            return (T) new TestBoundedMultipleInputOperator("Operator0", parameters);
-        }
-
-        @Override
-        public Class<? extends StreamOperator<String>> getStreamOperatorClass(
-                ClassLoader classLoader) {
-            return TestBoundedMultipleInputOperator.class;
-        }
-    }
-
-    private static class DuplicatingOperatorFactory extends AbstractStreamOperatorFactory<String> {
-        @Override
-        public <T extends StreamOperator<String>> T createStreamOperator(
-                StreamOperatorParameters<String> parameters) {
-            return (T) new DuplicatingOperator(parameters);
-        }
-
-        @Override
-        public Class<? extends StreamOperator<String>> getStreamOperatorClass(
-                ClassLoader classLoader) {
-            return DuplicatingOperator.class;
-        }
-    }
-
     /** Factory for {@link MapToStringMultipleInputOperator}. */
     protected static class MapToStringMultipleInputOperatorFactory
             extends AbstractStreamOperatorFactory<String> {
@@ -1138,6 +1109,7 @@ public class MultipleInputStreamTaskTest {
             extends MapToStringMultipleInputOperator implements BoundedMultiInput {
         public static final String OPEN = "MultipleInputOperator#open";
         public static final String CLOSE = "MultipleInputOperator#close";
+        public static final String FINISH = "MultipleInputOperator#finish";
         public static final String END_INPUT = "MultipleInputOperator#endInput";
 
         private static final long serialVersionUID = 1L;
@@ -1163,6 +1135,11 @@ public class MultipleInputStreamTaskTest {
         public void endInput(int inputId) {
             LIFE_CYCLE_EVENTS.add(END_INPUT);
         }
+
+        @Override
+        public void finish() throws Exception {
+            LIFE_CYCLE_EVENTS.add(FINISH);
+        }
     }
 
     static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index c845e1d..c5c653b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -714,9 +714,11 @@ public class OneInputStreamTaskTest extends TestLogger {
                 expected,
                 new StreamRecord<>("Hello"),
                 new StreamRecord<>("[Operator0]: End of input"),
-                new StreamRecord<>("[Operator0]: Bye"),
+                new StreamRecord<>("[Operator0]: Finish"),
                 new StreamRecord<>("[Operator1]: End of input"),
-                new StreamRecord<>("[Operator1]: Bye"));
+                new StreamRecord<>("[Operator1]: Finish"),
+                new StreamRecord<>("[Operator1]: Bye"),
+                new StreamRecord<>("[Operator0]: Bye"));
 
         final Object[] output = testHarness.getOutput().toArray();
         assertArrayEquals("Output was not correct.", expected.toArray(), output);
@@ -733,7 +735,7 @@ public class OneInputStreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
 
             // verify that the timer service is still running
             Assert.assertTrue(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 607d848..287f86f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -267,9 +267,11 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
                 expected,
                 new StreamRecord<>("Hello"),
                 new StreamRecord<>("[Source0]: End of input"),
-                new StreamRecord<>("[Source0]: Bye"),
+                new StreamRecord<>("[Source0]: Finish"),
                 new StreamRecord<>("[Operator1]: End of input"),
-                new StreamRecord<>("[Operator1]: Bye"));
+                new StreamRecord<>("[Operator1]: Finish"),
+                new StreamRecord<>("[Operator1]: Bye"),
+                new StreamRecord<>("[Source0]: Bye"));
 
         final Object[] output = testHarness.getOutput().toArray();
         assertArrayEquals("Output was not correct.", expected.toArray(), output);
@@ -311,6 +313,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         }
 
         expectedOutput.add(new StreamRecord<>("Hello"));
+        expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));
 
         TestHarnessUtil.assertOutputEquals(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -919,12 +922,18 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             ProcessingTimeService timeService = getProcessingTimeService();
             timeService.registerTimer(
                     timeService.getCurrentProcessingTime(),
                     t -> output("[" + name + "]: Timer registered in close"));
 
+            output("[" + name + "]: Finish");
+            super.finish();
+        }
+
+        @Override
+        public void close() throws Exception {
             output("[" + name + "]: Bye");
             super.close();
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
index 640efe0..7725f3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
@@ -131,9 +131,9 @@ public class StreamOperatorWrapperTest extends TestLogger {
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testFinish() throws Exception {
         output.clear();
-        operatorWrappers.get(0).close(containingTask.getActionExecutor(), false);
+        operatorWrappers.get(0).finish(containingTask.getActionExecutor(), false);
 
         List<Object> expected = new ArrayList<>();
         for (int i = 0; i < operatorWrappers.size(); i++) {
@@ -143,7 +143,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
                     prefix + ": End of input",
                     prefix + ": Timer that was in mailbox before closing operator",
                     prefix + ": Bye",
-                    prefix + ": Mail to put in mailbox when closing operator");
+                    prefix + ": Mail to put in mailbox when finishing operator");
         }
 
         assertArrayEquals(
@@ -153,12 +153,12 @@ public class StreamOperatorWrapperTest extends TestLogger {
     }
 
     @Test
-    public void testClosingOperatorWithException() {
-        AbstractStreamOperator streamOperator =
+    public void testFinishingOperatorWithException() {
+        AbstractStreamOperator<Void> streamOperator =
                 new AbstractStreamOperator<Void>() {
                     @Override
-                    public void close() throws Exception {
-                        throw new Exception("test exception at closing");
+                    public void finish() throws Exception {
+                        throw new Exception("test exception at finishing");
                     }
                 };
 
@@ -172,11 +172,11 @@ public class StreamOperatorWrapperTest extends TestLogger {
                         true);
 
         try {
-            operatorWrapper.close(containingTask.getActionExecutor(), false);
+            operatorWrapper.finish(containingTask.getActionExecutor(), false);
             fail("should throw an exception");
         } catch (Throwable t) {
             Optional<Throwable> optional =
-                    ExceptionUtils.findThrowableWithMessage(t, "test exception at closing");
+                    ExceptionUtils.findThrowableWithMessage(t, "test exception at finishing");
             assertTrue(optional.isPresent());
         }
     }
@@ -313,20 +313,22 @@ public class StreamOperatorWrapperTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             ProcessingTimeCallback callback =
                     t1 ->
                             output.add(
                                     "["
                                             + name
-                                            + "]: Timer to put in mailbox when closing operator");
+                                            + "]: Timer to put in mailbox when finishing operator");
             assertNotNull(processingTimeService.registerTimer(0, callback));
             assertNull(timerMailController.getPuttingLatch(callback));
 
             mailboxExecutor.submit(
                     () ->
                             output.add(
-                                    "[" + name + "]: Mail to put in mailbox when closing operator"),
+                                    "["
+                                            + name
+                                            + "]: Mail to put in mailbox when finishing operator"),
                     "");
 
             output.add("[" + name + "]: Bye");
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f2123b7..78356a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -371,7 +371,7 @@ public class StreamTaskTest extends TestLogger {
                 throw ex;
             }
             if (!(ex.getCause().getSuppressed()[0]
-                    instanceof FailingTwiceOperator.DisposeException)) {
+                    instanceof FailingTwiceOperator.CloseException)) {
                 throw ex;
             }
         }
@@ -387,13 +387,13 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void dispose() throws Exception {
-            throw new DisposeException();
+        public void close() throws Exception {
+            throw new CloseException();
         }
 
-        class DisposeException extends Exception {
-            public DisposeException() {
-                super("Dispose Exception. This exception should be suppressed");
+        static class CloseException extends Exception {
+            public CloseException() {
+                super("Close Exception. This exception should be suppressed");
             }
         }
     }
@@ -1143,10 +1143,10 @@ public class StreamTaskTest extends TestLogger {
      */
     @Test
     public void testOperatorClosingBeforeStopRunning() throws Throwable {
-        BlockingCloseStreamOperator.resetLatches();
+        BlockingFinishStreamOperator.resetLatches();
         Configuration taskConfiguration = new Configuration();
         StreamConfig streamConfig = new StreamConfig(taskConfiguration);
-        streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
+        streamConfig.setStreamOperator(new BlockingFinishStreamOperator());
         streamConfig.setOperatorID(new OperatorID());
 
         try (MockEnvironment mockEnvironment =
@@ -1158,16 +1158,16 @@ public class StreamTaskTest extends TestLogger {
                         .setTaskConfiguration(taskConfiguration)
                         .build()) {
 
-            RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task =
+            RunningTask<StreamTask<Void, BlockingFinishStreamOperator>> task =
                     runTask(() -> new NoOpStreamTask<>(mockEnvironment));
 
-            BlockingCloseStreamOperator.inClose.await();
+            BlockingFinishStreamOperator.inClose.await();
 
             // check that the StreamTask is not yet in isRunning == false
             assertTrue(task.streamTask.isRunning());
 
             // let the operator finish its close operation
-            BlockingCloseStreamOperator.finishClose.trigger();
+            BlockingFinishStreamOperator.finishClose.trigger();
 
             task.waitForTaskCompletion(false);
 
@@ -1184,7 +1184,7 @@ public class StreamTaskTest extends TestLogger {
      */
     @Test
     public void testNotifyCheckpointOnClosedOperator() throws Throwable {
-        ClosingOperator operator = new ClosingOperator();
+        ClosingOperator<Integer> operator = new ClosingOperator<>();
         StreamTaskMailboxTestHarnessBuilder<Integer> builder =
                 new StreamTaskMailboxTestHarnessBuilder<>(
                                 OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
@@ -1198,15 +1198,15 @@ public class StreamTaskTest extends TestLogger {
 
         harness.streamTask.notifyCheckpointCompleteAsync(1);
         harness.streamTask.runMailboxStep();
-        assertEquals(1, operator.notified.get());
-        assertEquals(false, operator.closed.get());
+        assertEquals(1, ClosingOperator.notified.get());
+        assertFalse(ClosingOperator.closed.get());
 
         // close operators directly, so that task is still fully running
-        harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
+        harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor());
         harness.streamTask.notifyCheckpointCompleteAsync(2);
         harness.streamTask.runMailboxStep();
-        assertEquals(1, operator.notified.get());
-        assertEquals(true, operator.closed.get());
+        assertEquals(1, ClosingOperator.notified.get());
+        assertTrue(ClosingOperator.closed.get());
     }
 
     @Test
@@ -1247,7 +1247,7 @@ public class StreamTaskTest extends TestLogger {
      */
     @Test
     public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
-        ClosingOperator operator = new ClosingOperator();
+        ClosingOperator<Integer> operator = new ClosingOperator<>();
         StreamTaskMailboxTestHarnessBuilder<Integer> builder =
                 new StreamTaskMailboxTestHarnessBuilder<>(
                                 OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
@@ -1258,8 +1258,8 @@ public class StreamTaskTest extends TestLogger {
         harness.setAutoProcess(false);
         harness.processElement(new StreamRecord<>(1));
 
-        harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
-        assertEquals(true, operator.closed.get());
+        harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor());
+        assertTrue(ClosingOperator.closed.get());
 
         harness.streamTask.triggerCheckpointOnBarrier(
                 new CheckpointMetaData(1, 0),
@@ -2057,14 +2057,14 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
-    private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
+    private static class BlockingFinishStreamOperator extends AbstractStreamOperator<Void> {
         private static final long serialVersionUID = -9042150529568008847L;
 
         private static volatile OneShotLatch inClose;
         private static volatile OneShotLatch finishClose;
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             checkLatches();
             inClose.trigger();
             finishClose.await();
@@ -2673,7 +2673,7 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void finish() throws Exception {
             super.close();
             closed.set(true);
         }
@@ -2769,7 +2769,7 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void dispose() throws Exception {
+        public void close() throws Exception {
             wasClosed = true;
         }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index de9c703..f023e10 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -547,10 +547,10 @@ public class SubtaskCheckpointCoordinatorTest {
         public void open() throws Exception {}
 
         @Override
-        public void close() throws Exception {}
+        public void finish() throws Exception {}
 
         @Override
-        public void dispose() {}
+        public void close() throws Exception {}
 
         @Override
         public void prepareSnapshotPreBarrier(long checkpointId) {}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
index b5f523d..05673c0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
@@ -53,12 +53,18 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         ProcessingTimeService timeService = getProcessingTimeService();
         timeService.registerTimer(
                 timeService.getCurrentProcessingTime(),
-                t -> output("[" + name + "]: Timer registered in close"));
+                t -> output("[" + name + "]: Timer registered in finish"));
+
+        output("[" + name + "]: Finish");
+        super.finish();
+    }
 
+    @Override
+    public void close() throws Exception {
         output("[" + name + "]: Bye");
         super.close();
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
index 5edd4fe..8dd49d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java
@@ -51,14 +51,19 @@ public class TestBoundedTwoInputOperator extends AbstractStreamOperator<String>
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         ProcessingTimeService timeService = getProcessingTimeService();
         timeService.registerTimer(
                 timeService.getCurrentProcessingTime(),
                 t -> output("[" + name + "]: Timer registered in close"));
 
-        output.collect(new StreamRecord<>("[" + name + "]: Bye"));
-        super.close();
+        output.collect(new StreamRecord<>("[" + name + "]: Finish"));
+        super.finish();
+    }
+
+    @Override
+    public void close() throws Exception {
+        output("[" + name + "]: Bye");
     }
 
     private void output(String record) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 6e1fb62..d1ca9fc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -710,9 +710,11 @@ public class TwoInputStreamTaskTest {
                 new StreamRecord<>("[Operator0-1]: End of input"),
                 new StreamRecord<>("[Operator0-2]: Hello-2"),
                 new StreamRecord<>("[Operator0-2]: End of input"),
-                new StreamRecord<>("[Operator0]: Bye"),
+                new StreamRecord<>("[Operator0]: Finish"),
                 new StreamRecord<>("[Operator1]: End of input"),
-                new StreamRecord<>("[Operator1]: Bye"));
+                new StreamRecord<>("[Operator1]: Finish"),
+                new StreamRecord<>("[Operator1]: Bye"),
+                new StreamRecord<>("[Operator0]: Bye"));
 
         final Object[] output = testHarness.getOutput().toArray();
         assertArrayEquals("Output was not correct.", expected.toArray(), output);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 3663052..95ad25e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -686,14 +686,14 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
         operator.notifyCheckpointComplete(checkpointId);
     }
 
-    /** Calls close and dispose on the operator. */
+    /** Calls finish and close on the operator. */
     public void close() throws Exception {
-        operator.close();
-        operator.dispose();
         if (processingTimeService != null) {
             processingTimeService.shutdownService();
         }
         setupCalled = false;
+        operator.finish();
+        operator.close();
 
         if (internalEnvironment.isPresent()) {
             internalEnvironment.get().close();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
deleted file mode 100644
index 6db4adf..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.operators.AbstractInput;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
-import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** A test operator class implementing {@link BoundedMultiInput}. */
-public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2<String>
-        implements MultipleInputStreamOperator<String>, BoundedMultiInput {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String name;
-
-    public TestBoundedMultipleInputOperator(
-            String name, StreamOperatorParameters<String> parameters) {
-        super(parameters, 3);
-        this.name = name;
-    }
-
-    @Override
-    public List<Input> getInputs() {
-        return Arrays.asList(
-                new TestInput(this, 1), new TestInput(this, 2), new TestInput(this, 3));
-    }
-
-    @Override
-    public void endInput(int inputId) {
-        output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: End of input"));
-    }
-
-    @Override
-    public void close() throws Exception {
-        output.collect(new StreamRecord<>("[" + name + "]: Bye"));
-        super.close();
-    }
-
-    class TestInput extends AbstractInput<String, String> {
-        public TestInput(AbstractStreamOperatorV2<String> owner, int inputId) {
-            super(owner, inputId);
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) throws Exception {
-            output.collect(
-                    element.replace("[" + name + "-" + inputId + "]: " + element.getValue()));
-        }
-    }
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
index 73f6f1f..2a96c62 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
@@ -159,8 +159,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         if (helper != null) {
             helper.close();
         }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
index b94da3d..ef67090 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
@@ -40,8 +40,6 @@ public abstract class TableStreamOperator<OUT> extends AbstractStreamOperator<OU
     /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
     protected long currentWatermark = Long.MIN_VALUE;
 
-    private volatile boolean closed = false;
-
     protected transient ContextImpl ctx;
 
     public TableStreamOperator() {
@@ -55,20 +53,6 @@ public abstract class TableStreamOperator<OUT> extends AbstractStreamOperator<OU
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
-        closed = true;
-    }
-
-    @Override
-    public void dispose() throws Exception {
-        if (!closed) {
-            close();
-        }
-        super.dispose();
-    }
-
-    @Override
     public void processWatermark(Watermark mark) throws Exception {
         super.processWatermark(mark);
         currentWatermark = mark.getTimestamp();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index 63f184a..fbdfe48 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -66,9 +66,6 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
     /** This is used for emitting elements with a given timestamp. */
     protected transient TimestampedCollector<RowData> collector;
 
-    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
-    private transient boolean functionsClosed = false;
-
     /** current watermark of this operator. */
     private transient long currentWatermark;
 
@@ -94,7 +91,6 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
     @Override
     public void open() throws Exception {
         super.open();
-        functionsClosed = false;
 
         collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
@@ -142,22 +138,11 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
     public void close() throws Exception {
         super.close();
         collector = null;
-        functionsClosed = true;
         if (windowBuffer != null) {
             windowBuffer.close();
         }
     }
 
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            windowBuffer.close();
-        }
-    }
-
     /** Compute memory size from memory faction. */
     private long computeMemorySize() {
         final Environment environment = getContainingTask().getEnvironment();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
index eba6e32..523594d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
@@ -151,28 +151,30 @@ public abstract class AbstractMapBundleOperator<K, V, IN, OUT> extends AbstractS
     }
 
     @Override
+    public void finish() throws Exception {
+        finishBundle();
+        super.finish();
+    }
+
+    @Override
     public void close() throws Exception {
+        Exception exception = null;
+
         try {
-            finishBundle();
-        } finally {
-            Exception exception = null;
-
-            try {
-                super.close();
-                if (function != null) {
-                    FunctionUtils.closeFunction(function);
-                }
-            } catch (InterruptedException interrupted) {
-                exception = interrupted;
-
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                exception = e;
+            super.close();
+            if (function != null) {
+                FunctionUtils.closeFunction(function);
             }
+        } catch (InterruptedException interrupted) {
+            exception = interrupted;
 
-            if (exception != null) {
-                LOG.warn("Errors occurred while closing the BundleOperator.", exception);
-            }
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
+            LOG.warn("Errors occurred while closing the BundleOperator.", exception);
         }
     }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
index a874922..3d3b560 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
@@ -134,6 +134,7 @@ public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorW
     @Override
     public void close() throws Exception {
         FunctionUtils.closeFunction(joinCondition);
+        super.close();
     }
 
     /**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index bdd7cac..63ae148 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -220,6 +220,7 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
         if (joinCondition != null) {
             joinCondition.close();
         }
+        super.close();
     }
 
     /**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
index d51064f..66a5de2 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -92,9 +92,6 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
     private final boolean[] filterNullKeys;
     private final ZoneId shiftTimeZone;
 
-    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
-    private transient boolean functionsClosed = false;
-
     private transient WindowTimerService<Long> windowTimerService;
 
     // ------------------------------------------------------------------------
@@ -136,7 +133,6 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
     @Override
     public void open() throws Exception {
         super.open();
-        functionsClosed = false;
 
         this.collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
@@ -197,25 +193,12 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
     public void close() throws Exception {
         super.close();
         collector = null;
-        functionsClosed = true;
         if (joinCondition != null) {
             joinCondition.close();
         }
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            if (joinCondition != null) {
-                joinCondition.close();
-            }
-        }
-    }
-
-    @Override
     public void processElement1(StreamRecord<RowData> element) throws Exception {
         processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState);
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
index 32863f8..ba28e51 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
@@ -130,29 +130,29 @@ public abstract class MultipleInputStreamOperatorBase extends AbstractStreamOper
     }
 
     /**
-     * Closes all sub-operators in a multiple input operator effect way. Closing happens from
+     * Finish all sub-operators in a multiple input operator effect way. Finishing happens from
      * <b>head to tail</b> sub-operator in a multiple input operator, contrary to {@link
      * StreamOperator#open()} which happens <b>tail to head</b>.
      */
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
         for (TableOperatorWrapper<?> wrapper : topologicalOrderingOperators) {
-            wrapper.close();
+            StreamOperator<?> operator = wrapper.getStreamOperator();
+            operator.finish();
         }
     }
 
     /**
-     * Dispose all sub-operators in a multiple input operator effect way. Disposing happens from
+     * Closes all sub-operators in a multiple input operator effect way. Closing happens from
      * <b>head to tail</b> sub-operator in a multiple input operator, contrary to {@link
      * StreamOperator#open()} which happens <b>tail to head</b>.
      */
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         for (TableOperatorWrapper<?> wrapper : topologicalOrderingOperators) {
-            StreamOperator<?> operator = wrapper.getStreamOperator();
-            operator.dispose();
+            wrapper.close();
         }
     }
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
index 642c03f..133f6e5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
@@ -134,8 +134,8 @@ public class StreamSortOperator extends TableStreamOperator<RowData>
     }
 
     @Override
-    public void close() throws Exception {
-        LOG.info("Closing StreamSortOperator");
+    public void finish() throws Exception {
+        LOG.info("Finishing StreamSortOperator");
 
         // BoundedOneInput can not coexistence with checkpoint, so we emit output in close.
         if (!inputBuffer.isEmpty()) {
@@ -153,6 +153,6 @@ public class StreamSortOperator extends TableStreamOperator<RowData>
                         }
                     });
         }
-        super.close();
+        super.finish();
     }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
index 77344b7..c2b5b29 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java
@@ -154,9 +154,6 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
     /** This is used for emitting elements with a given timestamp. */
     protected transient TimestampedCollector<RowData> collector;
 
-    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
-    private transient boolean functionsClosed = false;
-
     private transient InternalTimerService<W> internalTimerService;
 
     private transient InternalValueState<K, W, RowData> windowState;
@@ -242,8 +239,6 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
     public void open() throws Exception {
         super.open();
 
-        functionsClosed = false;
-
         collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
 
@@ -318,26 +313,12 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
         super.close();
         collector = null;
         triggerContext = null;
-        functionsClosed = true;
         if (windowAggregator != null) {
             windowAggregator.close();
         }
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        triggerContext = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            if (windowAggregator != null) {
-                windowAggregator.close();
-            }
-        }
-    }
-
-    @Override
     public void processElement(StreamRecord<RowData> record) throws Exception {
         RowData inputRow = record.getValue();
         long timestamp;
@@ -407,10 +388,6 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
 
     @Override
     public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
-        if (functionsClosed) {
-            return;
-        }
-
         setCurrentKey(timer.getKey());
 
         triggerContext.window = timer.getNamespace();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
index 516f09b..1e2e093 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java
@@ -110,9 +110,6 @@ public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowDa
     /** This is used for emitting elements with a given timestamp. */
     protected transient TimestampedCollector<RowData> collector;
 
-    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
-    private transient boolean functionsClosed = false;
-
     /** The service to register timers. */
     private transient InternalTimerService<W> internalTimerService;
 
@@ -135,7 +132,6 @@ public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowDa
     @Override
     public void open() throws Exception {
         super.open();
-        functionsClosed = false;
 
         lastTriggeredProcessingTime = Long.MIN_VALUE;
         collector = new TimestampedCollector<>(output);
@@ -178,21 +174,10 @@ public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowDa
     public void close() throws Exception {
         super.close();
         collector = null;
-        functionsClosed = true;
         windowProcessor.close();
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        collector = null;
-        if (!functionsClosed) {
-            functionsClosed = true;
-            windowProcessor.close();
-        }
-    }
-
-    @Override
     public void processElement(StreamRecord<RowData> element) throws Exception {
         RowData inputRow = element.getValue();
         RowData currentKey = (RowData) getCurrentKey();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
index 69d9bf3..8ae817e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
@@ -106,9 +106,4 @@ public class ProcTimeMiniBatchAssignerOperator extends AbstractStreamOperator<Ro
             output.emitWatermark(mark);
         }
     }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-    }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
index 5fcbc17..079eec0 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperator.java
@@ -98,8 +98,8 @@ public class RowTimeMiniBatchAssginerOperator extends AbstractStreamOperator<Row
     }
 
     @Override
-    public void close() throws Exception {
-        super.close();
+    public void finish() throws Exception {
+        super.finish();
 
         // emit the buffered watermark
         advanceWatermark();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
index d8b3a07..c0e664d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
@@ -56,9 +56,6 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData>
 
     private transient long lastRecordTime;
 
-    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
-    private transient boolean functionsClosed = false;
-
     private transient StreamStatus currentStatus = StreamStatus.ACTIVE;
 
     /**
@@ -183,20 +180,14 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData>
     }
 
     @Override
-    public void close() throws Exception {
+    public void finish() throws Exception {
         // all records have been processed, emit a final watermark
         processWatermark(Watermark.MAX_WATERMARK);
-
-        functionsClosed = true;
-        FunctionUtils.closeFunction(watermarkGenerator);
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (!functionsClosed) {
-            functionsClosed = true;
-            FunctionUtils.closeFunction(watermarkGenerator);
-        }
+    public void close() throws Exception {
+        FunctionUtils.closeFunction(watermarkGenerator);
+        super.close();
     }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
index ee24464..d9dde71 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java
@@ -133,37 +133,6 @@ public class BatchMultipleInputStreamOperatorTest extends MultipleInputTestBase
     }
 
     @Test
-    public void testDispose() throws Exception {
-        TestingBatchMultipleInputStreamOperator op = createMultipleInputStreamOperator();
-        TestingTwoInputStreamOperator joinOp2 =
-                (TestingTwoInputStreamOperator) op.getTailWrapper().getStreamOperator();
-
-        TableOperatorWrapper<?> joinWrapper1 = op.getTailWrapper().getInputWrappers().get(0);
-        TestingTwoInputStreamOperator joinOp1 =
-                (TestingTwoInputStreamOperator) joinWrapper1.getStreamOperator();
-
-        TableOperatorWrapper<?> aggWrapper1 = joinWrapper1.getInputWrappers().get(0);
-        TestingOneInputStreamOperator aggOp1 =
-                (TestingOneInputStreamOperator) aggWrapper1.getStreamOperator();
-
-        TableOperatorWrapper<?> aggWrapper2 = joinWrapper1.getInputWrappers().get(1);
-        TestingOneInputStreamOperator aggOp2 =
-                (TestingOneInputStreamOperator) aggWrapper2.getStreamOperator();
-
-        assertFalse(aggOp1.isDisposed());
-        assertFalse(aggOp2.isDisposed());
-        assertFalse(aggOp1.isDisposed());
-        assertFalse(joinOp2.isDisposed());
-
-        op.dispose();
-
-        assertTrue(aggOp1.isDisposed());
-        assertTrue(aggOp2.isDisposed());
-        assertTrue(joinOp1.isDisposed());
-        assertTrue(joinOp2.isDisposed());
-    }
-
-    @Test
     public void testClose() throws Exception {
         TestingBatchMultipleInputStreamOperator op = createMultipleInputStreamOperator();
         TestingTwoInputStreamOperator joinOp2 =
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
index efec9a9..c570db9 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java
@@ -41,7 +41,6 @@ public class TestingOneInputStreamOperator extends AbstractStreamOperator<RowDat
     private Watermark currentWatermark = null;
     private LatencyMarker currentLatencyMarker = null;
     private boolean isEnd = false;
-    private boolean isDisposed = false;
     private boolean isClosed = false;
     private final List<StreamRecord<RowData>> receivedElements = new ArrayList<>();
 
@@ -91,11 +90,6 @@ public class TestingOneInputStreamOperator extends AbstractStreamOperator<RowDat
     }
 
     @Override
-    public void dispose() throws Exception {
-        this.isDisposed = true;
-    }
-
-    @Override
     public void close() throws Exception {
         isClosed = true;
     }
@@ -120,10 +114,6 @@ public class TestingOneInputStreamOperator extends AbstractStreamOperator<RowDat
         return isEnd;
     }
 
-    public boolean isDisposed() {
-        return isDisposed;
-    }
-
     public boolean isClosed() {
         return isClosed;
     }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
index 25a40c9..252fa9a 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingTwoInputStreamOperator.java
@@ -45,7 +45,6 @@ public class TestingTwoInputStreamOperator extends AbstractStreamOperator<RowDat
     private LatencyMarker currentLatencyMarker1 = null;
     private LatencyMarker currentLatencyMarker2 = null;
     private final List<Integer> endInputs = new ArrayList<>();
-    private boolean isDisposed = false;
     private boolean isClosed = false;
 
     public TestingTwoInputStreamOperator() {
@@ -118,11 +117,6 @@ public class TestingTwoInputStreamOperator extends AbstractStreamOperator<RowDat
     }
 
     @Override
-    public void dispose() throws Exception {
-        this.isDisposed = true;
-    }
-
-    @Override
     public void close() throws Exception {
         isClosed = true;
     }
@@ -159,10 +153,6 @@ public class TestingTwoInputStreamOperator extends AbstractStreamOperator<RowDat
         return endInputs;
     }
 
-    public boolean isDisposed() {
-        return isDisposed;
-    }
-
     public boolean isClosed() {
         return isClosed;
     }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 1dc9413..84d71d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -768,8 +768,8 @@ public class TimestampITCase extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
-            super.close();
+        public void finish() throws Exception {
+            super.finish();
             finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
         }
     }

Mime
View raw message