From commits-return-41764-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jul 6 06:23:15 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 7047C180643 for ; Tue, 6 Jul 2021 08:23:15 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 8F84B3EEF6 for ; Tue, 6 Jul 2021 06:23:14 +0000 (UTC) Received: (qmail 2117 invoked by uid 500); 6 Jul 2021 06:23:13 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 2058 invoked by uid 99); 6 Jul 2021 06:23:13 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jul 2021 06:23:13 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E7FD08DC92; Tue, 6 Jul 2021 06:23:12 +0000 (UTC) Date: Tue, 06 Jul 2021 06:23:13 +0000 To: "commits@flink.apache.org" Subject: [flink] 02/03: [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close and finish MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: dwysakowicz@apache.org In-Reply-To: <162555259030.22930.10520666014172683221@gitbox.apache.org> References: <162555259030.22930.10520666014172683221@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: flink X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 541f43026203b56c0501a33ed54271270ac3e085 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210706062312.E7FD08DC92@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 541f43026203b56c0501a33ed54271270ac3e085 Author: Dawid Wysakowicz AuthorDate: Fri Jun 25 15:32:56 2021 +0200 [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close and finish This commit cleans up StreamOperator API in regards to the termination phase and introduces a clean finish() method for flushing all records without releasing resources. The StreamOperator#close method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose method. This closes #16351 --- .../flink/state/api/output/BoundedStreamTask.java | 2 +- .../operators/StateBootstrapWrapperOperator.java | 8 +- .../flink/state/api/output/SnapshotUtilsTest.java | 8 +- .../python/AbstractPythonFunctionOperator.java | 20 ++-- .../PythonTimestampsAndWatermarksOperator.java | 4 +- ...stractArrowPythonAggregateFunctionOperator.java | 4 +- ...tBatchArrowPythonAggregateFunctionOperator.java | 4 +- .../RowDataArrowPythonScalarFunctionOperator.java | 14 +-- .../source/ContinuousFileReaderOperator.java | 104 ++++++++++----------- .../api/operators/AbstractStreamOperator.java | 23 +---- .../api/operators/AbstractStreamOperatorV2.java | 23 +---- .../api/operators/AbstractUdfStreamOperator.java | 13 --- .../streaming/api/operators/SourceOperator.java | 18 +--- .../streaming/api/operators/StreamOperator.java | 26 +++--- .../streaming/api/operators/StreamSource.java | 22 +++-- .../api/operators/collect/CollectSinkOperator.java | 4 +- .../runtime/operators/GenericWriteAheadSink.java | 1 + .../operators/TimestampsAndWatermarksOperator.java | 4 +- .../windowing/EvictingWindowOperator.java | 6 -- .../operators/windowing/WindowOperator.java | 9 -- .../streaming/runtime/tasks/OperatorChain.java | 6 +- .../runtime/tasks/StreamOperatorWrapper.java | 60 ++++++------ .../flink/streaming/runtime/tasks/StreamTask.java | 33 ++++--- .../AbstractUdfStreamOperatorLifecycleTest.java | 23 ++--- .../api/operators/SourceOperatorTest.java | 12 --- .../ContinuousFileProcessingRescalingTest.java | 2 +- .../StreamSourceOperatorLatencyMetricsTest.java | 2 +- .../runtime/tasks/MultipleInputStreamTaskTest.java | 43 ++------- .../runtime/tasks/OneInputStreamTaskTest.java | 8 +- .../runtime/tasks/SourceStreamTaskTest.java | 15 ++- .../runtime/tasks/StreamOperatorWrapperTest.java | 26 +++--- .../streaming/runtime/tasks/StreamTaskTest.java | 48 +++++----- .../tasks/SubtaskCheckpointCoordinatorTest.java | 4 +- .../tasks/TestBoundedOneInputStreamOperator.java | 10 +- .../runtime/tasks/TestBoundedTwoInputOperator.java | 11 ++- .../runtime/tasks/TwoInputStreamTaskTest.java | 6 +- .../util/AbstractStreamOperatorTestHarness.java | 6 +- .../util/TestBoundedMultipleInputOperator.java | 74 --------------- .../filesystem/stream/AbstractStreamingWriter.java | 4 +- .../runtime/operators/TableStreamOperator.java | 16 ---- .../window/LocalSlicingWindowAggOperator.java | 15 --- .../bundle/AbstractMapBundleOperator.java | 38 ++++---- .../temporal/TemporalProcessTimeJoinOperator.java | 1 + .../join/temporal/TemporalRowTimeJoinOperator.java | 1 + .../operators/join/window/WindowJoinOperator.java | 17 ---- .../MultipleInputStreamOperatorBase.java | 18 ++-- .../runtime/operators/sort/StreamSortOperator.java | 6 +- .../runtime/operators/window/WindowOperator.java | 23 ----- .../window/slicing/SlicingWindowOperator.java | 15 --- .../ProcTimeMiniBatchAssignerOperator.java | 5 - .../RowTimeMiniBatchAssginerOperator.java | 4 +- .../wmassigners/WatermarkAssignerOperator.java | 17 +--- .../BatchMultipleInputStreamOperatorTest.java | 31 ------ .../TestingOneInputStreamOperator.java | 10 -- .../TestingTwoInputStreamOperator.java | 10 -- .../test/streaming/runtime/TimestampITCase.java | 4 +- 56 files changed, 314 insertions(+), 627 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java index ee30d4f..257bc70 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java @@ -106,6 +106,7 @@ class BoundedStreamTask & Bo mainOperator.processElement(streamRecord); } else { mainOperator.endInput(); + mainOperator.finish(); controller.suspendDefaultAction(); mailboxProcessor.suspend(); } @@ -117,7 +118,6 @@ class BoundedStreamTask & Bo @Override protected void cleanup() throws Exception { mainOperator.close(); - mainOperator.dispose(); } private static class CollectorWrapper implements Output> { diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java index 245a7d4..92eb6d3 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java @@ -105,13 +105,13 @@ public final class StateBootstrapWrapperOperator< } @Override - public void close() throws Exception { - operator.close(); + public void finish() throws Exception { + operator.finish(); } @Override - public void dispose() throws Exception { - operator.dispose(); + public void close() throws Exception { + operator.close(); } @Override diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index 1178654..8430888 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -74,13 +74,13 @@ public class SnapshotUtilsTest { } @Override - public void close() throws Exception { - ACTUAL_ORDER_TRACKING.add("close"); + public void finish() throws Exception { + ACTUAL_ORDER_TRACKING.add("finish"); } @Override - public void dispose() throws Exception { - ACTUAL_ORDER_TRACKING.add("dispose"); + public void close() throws Exception { + ACTUAL_ORDER_TRACKING.add("close"); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index 387c14f..7abb6c1 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -139,22 +139,16 @@ public abstract class AbstractPythonFunctionOperator extends AbstractStream } @Override - public void close() throws Exception { + public void finish() throws Exception { try { invokeFinishBundle(); } finally { - super.close(); - - try { - cleanUpLeakingClasses(this.getClass().getClassLoader()); - } catch (Throwable t) { - LOG.warn("Failed to clean up the leaking objects.", t); - } + super.finish(); } } @Override - public void dispose() throws Exception { + public void close() throws Exception { try { if (checkFinishBundleTimer != null) { checkFinishBundleTimer.cancel(true); @@ -165,7 +159,13 @@ public abstract class AbstractPythonFunctionOperator extends AbstractStream pythonFunctionRunner = null; } } finally { - super.dispose(); + super.close(); + + try { + cleanUpLeakingClasses(this.getClass().getClassLoader()); + } catch (Throwable t) { + LOG.warn("Failed to clean up the leaking objects.", t); + } } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java index 996aa74..57aea3d 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonTimestampsAndWatermarksOperator.java @@ -181,8 +181,8 @@ public class PythonTimestampsAndWatermarksOperator } @Override - public void close() throws Exception { - super.close(); + public void finish() throws Exception { + super.finish(); watermarkGenerator.onPeriodicEmit(watermarkOutput); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java index b6fe695..a7c63cf 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java @@ -107,8 +107,8 @@ public abstract class AbstractArrowPythonAggregateFunctionOperator } @Override - public void dispose() throws Exception { - super.dispose(); + public void close() throws Exception { + super.close(); if (arrowSerializer != null) { arrowSerializer.close(); arrowSerializer = null; diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java index ecbf2a9..4442cbd 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/AbstractBatchArrowPythonAggregateFunctionOperator.java @@ -100,9 +100,9 @@ abstract class AbstractBatchArrowPythonAggregateFunctionOperator } @Override - public void close() throws Exception { + public void finish() throws Exception { invokeCurrentBatch(); - super.close(); + super.finish(); } protected abstract void invokeCurrentBatch() throws Exception; diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java index b5193b5..cc2eca4 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java @@ -87,18 +87,18 @@ public class RowDataArrowPythonScalarFunctionOperator } @Override - public void dispose() throws Exception { - super.dispose(); - if (arrowSerializer != null) { - arrowSerializer.close(); - arrowSerializer = null; - } + public void finish() throws Exception { + invokeCurrentBatch(); + super.finish(); } @Override public void close() throws Exception { - invokeCurrentBatch(); super.close(); + if (arrowSerializer != null) { + arrowSerializer.close(); + arrowSerializer = null; + } } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 0df0be2..5c29092 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -82,8 +82,8 @@ import static org.apache.flink.util.Preconditions.checkState; * *
    *
  1. if {@link ReaderState#IDLE IDLE} then close immediately - *
  2. otherwise switch to {@link ReaderState#CLOSING CLOSING}, call {@link - * MailboxExecutor#yield() yield} in a loop until state is {@link ReaderState#CLOSED CLOSED} + *
  3. otherwise switch to {@link ReaderState#FINISHING CLOSING}, call {@link + * MailboxExecutor#yield() yield} in a loop until state is {@link ReaderState#FINISHED CLOSED} *
  4. {@link MailboxExecutor#yield() yield()} causes remaining records (and splits) to be * processed in the same way as above *
@@ -139,7 +139,7 @@ public class ContinuousFileReaderOperator } }, /** - * No further processing can be done; only state disposal transition to {@link #CLOSED} + * No further processing can be done; only state disposal transition to {@link #FINISHED} * allowed. */ FAILED { @@ -153,7 +153,7 @@ public class ContinuousFileReaderOperator * {@link #close()} was called but unprocessed data (records and splits) remains and needs * to be processed. {@link #close()} caller is blocked. */ - CLOSING { + FINISHING { @Override public boolean prepareToProcessRecord( ContinuousFileReaderOperator op) throws IOException { @@ -168,10 +168,10 @@ public class ContinuousFileReaderOperator // need one more mail to unblock possible yield() in close() method (todo: wait with // timeout in yield) op.enqueueProcessRecord(); - op.switchState(CLOSED); + op.switchState(FINISHED); } }, - CLOSED { + FINISHED { @Override public boolean prepareToProcessRecord( ContinuousFileReaderOperator op) { @@ -186,12 +186,12 @@ public class ContinuousFileReaderOperator static { Map> tmpTransitions = new HashMap<>(); - tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED, FAILED)); - tmpTransitions.put(OPENING, EnumSet.of(READING, CLOSING, FAILED)); - tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, CLOSING, FAILED)); - tmpTransitions.put(CLOSING, EnumSet.of(CLOSED, FAILED)); - tmpTransitions.put(FAILED, EnumSet.of(CLOSED)); - tmpTransitions.put(CLOSED, EnumSet.noneOf(ReaderState.class)); + tmpTransitions.put(IDLE, EnumSet.of(OPENING, FINISHED, FAILED)); + tmpTransitions.put(OPENING, EnumSet.of(READING, FINISHING, FAILED)); + tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, FINISHING, FAILED)); + tmpTransitions.put(FINISHING, EnumSet.of(FINISHED, FAILED)); + tmpTransitions.put(FAILED, EnumSet.of(FINISHED)); + tmpTransitions.put(FINISHED, EnumSet.noneOf(ReaderState.class)); VALID_TRANSITIONS = new EnumMap<>(tmpTransitions); } @@ -200,7 +200,7 @@ public class ContinuousFileReaderOperator } public final boolean isTerminal() { - return this == CLOSED; + return this == FINISHED; } public boolean canSwitchTo(ReaderState next) { @@ -302,7 +302,7 @@ public class ContinuousFileReaderOperator this.state = ReaderState.IDLE; if (this.format instanceof RichInputFormat) { - ((RichInputFormat) this.format).setRuntimeContext(getRuntimeContext()); + ((RichInputFormat) this.format).setRuntimeContext(getRuntimeContext()); } this.format.configure(new Configuration()); @@ -380,7 +380,7 @@ public class ContinuousFileReaderOperator private void readAndCollectRecord() throws IOException { Preconditions.checkState( - state == ReaderState.READING || state == ReaderState.CLOSING, + state == ReaderState.READING || state == ReaderState.FINISHING, "can't process record in state %s", state); if (format.reachedEnd()) { @@ -394,14 +394,14 @@ public class ContinuousFileReaderOperator private void loadSplit(T split) throws IOException { Preconditions.checkState( - state != ReaderState.READING && state != ReaderState.CLOSED, + state != ReaderState.READING && state != ReaderState.FINISHED, "can't load split in state %s", state); Preconditions.checkNotNull(split, "split is null"); LOG.debug("load split: {}", split); currentSplit = split; if (this.format instanceof RichInputFormat) { - ((RichInputFormat) this.format).openInputFormat(); + ((RichInputFormat) this.format).openInputFormat(); } if (format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) { // recovering after a node failure with an input @@ -436,14 +436,38 @@ public class ContinuousFileReaderOperator } @Override - public void dispose() throws Exception { + public void finish() throws Exception { + LOG.debug("finishing"); + super.finish(); + + switch (state) { + case IDLE: + switchState(ReaderState.FINISHED); + break; + case FINISHED: + LOG.warn("operator is already closed, doing nothing"); + return; + default: + switchState(ReaderState.FINISHING); + while (!state.isTerminal()) { + executor.yield(); + } + } + + try { + sourceContext.emitWatermark(Watermark.MAX_WATERMARK); + } catch (Exception e) { + LOG.warn("unable to emit watermark while closing", e); + } + } + + @Override + public void close() throws Exception { Exception e = null; - if (state != ReaderState.CLOSED) { - try { - cleanUp(); - } catch (Exception ex) { - e = ex; - } + try { + cleanUp(); + } catch (Exception ex) { + e = ex; } { checkpointedState = null; @@ -457,7 +481,7 @@ public class ContinuousFileReaderOperator splits = null; } try { - super.dispose(); + super.close(); } catch (Exception ex) { e = ExceptionUtils.firstOrSuppressed(ex, e); } @@ -466,34 +490,6 @@ public class ContinuousFileReaderOperator } } - @Override - public void close() throws Exception { - LOG.debug("closing"); - super.close(); - - switch (state) { - case IDLE: - switchState(ReaderState.CLOSED); - break; - case CLOSED: - LOG.warn("operator is already closed, doing nothing"); - return; - default: - switchState(ReaderState.CLOSING); - while (!state.isTerminal()) { - executor.yield(); - } - } - - try { - sourceContext.emitWatermark(Watermark.MAX_WATERMARK); - } catch (Exception e) { - LOG.warn("unable to emit watermark while closing", e); - } - - cleanUp(); - } - private void cleanUp() throws Exception { LOG.debug("cleanup, state={}", state); @@ -502,7 +498,7 @@ public class ContinuousFileReaderOperator () -> format.close(), () -> { if (this.format instanceof RichInputFormat) { - ((RichInputFormat) this.format).closeInputFormat(); + ((RichInputFormat) this.format).closeInputFormat(); } } }; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 23da8de..15faaf7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -322,30 +322,11 @@ public abstract class AbstractStreamOperator @Override public void open() throws Exception {} - /** - * This method is called after all records have been added to the operators via the methods - * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link - * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link - * TwoInputStreamOperator#processElement2(StreamRecord)}. - * - *

The method is expected to flush all remaining buffered data. Exceptions during this - * flushing of buffered should be propagated, in order to cause the operation to be recognized - * asa failed, because the last data items are not processed properly. - * - * @throws Exception An exception in this method causes the operator to fail. - */ @Override - public void close() throws Exception {} + public void finish() throws Exception {} - /** - * This method is called at the very end of the operator's life, both in the case of a - * successful completion of the operation, and in the case of a failure and canceling. - * - *

This method is expected to make a thorough effort to release all resources that the - * operator has acquired. - */ @Override - public void dispose() throws Exception { + public void close() throws Exception { if (stateHandler != null) { stateHandler.dispose(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 702b753..28b2c44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -266,30 +266,11 @@ public abstract class AbstractStreamOperatorV2 @Override public void open() throws Exception {} - /** - * This method is called after all records have been added to the operators via the methods - * {@link OneInputStreamOperator#processElement(StreamRecord)}, or {@link - * TwoInputStreamOperator#processElement1(StreamRecord)} and {@link - * TwoInputStreamOperator#processElement2(StreamRecord)}. - * - *

The method is expected to flush all remaining buffered data. Exceptions during this - * flushing of buffered should be propagated, in order to cause the operation to be recognized - * asa failed, because the last data items are not processed properly. - * - * @throws Exception An exception in this method causes the operator to fail. - */ @Override - public void close() throws Exception {} + public void finish() throws Exception {} - /** - * This method is called at the very end of the operator's life, both in the case of a - * successful completion of the operation, and in the case of a failure and canceling. - * - *

This method is expected to make a thorough effort to release all resources that the - * operator has acquired. - */ @Override - public void dispose() throws Exception { + public void close() throws Exception { if (stateHandler != null) { stateHandler.dispose(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 1ece8e7..d41eb95 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -53,9 +53,6 @@ public abstract class AbstractUdfStreamOperator /** The user function. */ protected final F userFunction; - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - public AbstractUdfStreamOperator(F userFunction) { this.userFunction = requireNonNull(userFunction); checkUdfCheckpointingPreconditions(); @@ -105,19 +102,9 @@ public abstract class AbstractUdfStreamOperator @Override public void close() throws Exception { super.close(); - functionsClosed = true; FunctionUtils.closeFunction(userFunction); } - @Override - public void dispose() throws Exception { - super.dispose(); - if (!functionsClosed) { - functionsClosed = true; - FunctionUtils.closeFunction(userFunction); - } - } - // ------------------------------------------------------------------------ // checkpointing and recovery // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 835ad5a..cc2fc60 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -127,9 +127,6 @@ public class SourceOperator extends AbstractStr */ private TimestampsAndWatermarks eventTimeLogic; - /** Indicating whether the source operator has been closed. */ - private boolean closed; - public SourceOperator( FunctionWithException, Exception> readerFactory, @@ -263,24 +260,19 @@ public class SourceOperator extends AbstractStr } @Override - public void close() throws Exception { + public void finish() throws Exception { if (eventTimeLogic != null) { eventTimeLogic.stopPeriodicWatermarkEmits(); } - if (sourceReader != null) { - sourceReader.close(); - } - closed = true; - super.close(); + super.finish(); } @Override - public void dispose() throws Exception { - // We also need to close the source reader to make sure the resources - // are released if the task does not finish normally. - if (!closed && sourceReader != null) { + public void close() throws Exception { + if (sourceReader != null) { sourceReader.close(); } + super.close(); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index c4a90be..7c522fa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Disposable; import java.io.Serializable; @@ -44,8 +43,7 @@ import java.io.Serializable; * @param The output type of the operator */ @PublicEvolving -public interface StreamOperator - extends CheckpointListener, KeyContext, Disposable, Serializable { +public interface StreamOperator extends CheckpointListener, KeyContext, Serializable { // ------------------------------------------------------------------------ // life cycle @@ -64,21 +62,21 @@ public interface StreamOperator void open() throws Exception; /** - * This method is called after all records have been added to the operators via the methods - * {@link - * org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, - * or {@link - * org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} - * and {@link - * org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}. + * This method is called at the end of data processing. * *

The method is expected to flush all remaining buffered data. Exceptions during this * flushing of buffered should be propagated, in order to cause the operation to be recognized * as failed, because the last data items are not processed properly. * + *

After this method is called, no more records can be produced for the downstream + * operators. + * + *

NOTE:This method does not need to close any resources. You should release external + * resources in the {@link #close()} method. + * * @throws java.lang.Exception An exception in this method causes the operator to fail. */ - void close() throws Exception; + void finish() throws Exception; /** * This method is called at the very end of the operator's life, both in the case of a @@ -86,9 +84,11 @@ public interface StreamOperator * *

This method is expected to make a thorough effort to release all resources that the * operator has acquired. + * + *

NOTE:It can not emit any records! If you need to emit records at the end of + * processing, do so in the {@link #finish()} method. */ - @Override - void dispose() throws Exception; + void close() throws Exception; // ------------------------------------------------------------------------ // state snapshots diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 4957049..fef7f28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -132,18 +132,20 @@ public class StreamSource> } @Override + public void finish() throws Exception { + super.finish(); + if (!isCanceledOrStopped() && ctx != null) { + advanceToEndOfEventTime(); + } + } + + @Override public void close() throws Exception { - try { - super.close(); - if (!isCanceledOrStopped() && ctx != null) { - advanceToEndOfEventTime(); - } - } finally { - // make sure that the context is closed in any case - if (ctx != null) { - ctx.close(); - } + // make sure that the context is closed in any case + if (ctx != null) { + ctx.close(); } + super.close(); } public void cancel() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java index 5c84c9b..8b78bc3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java @@ -51,9 +51,9 @@ public class CollectSinkOperator extends StreamSink implements OperatorE } @Override - public void close() throws Exception { + public void finish() throws Exception { sinkFunction.accumulateFinalResults(); - super.close(); + super.finish(); } public CompletableFuture getOperatorIdFuture() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 4d007bb..4627c7d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -137,6 +137,7 @@ public abstract class GenericWriteAheadSink extends AbstractStreamOperator extends AbstractStreamOperator evictorContext = null; } - @Override - public void dispose() throws Exception { - super.dispose(); - evictorContext = null; - } - // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index f3f30a4..57bbfd1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -288,15 +288,6 @@ public class WindowOperator } @Override - public void dispose() throws Exception { - super.dispose(); - timestampedCollector = null; - triggerContext = null; - processContext = null; - windowAssignerContext = null; - } - - @Override public void processElement(StreamRecord element) throws Exception { final Collection elementWindows = windowAssigner.assignWindows( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 5bfef4e..d1a35b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -406,7 +406,7 @@ public class OperatorChain> /** * Initialize state and open all operators in the chain from tail to heads, contrary to * {@link StreamOperator#close()} which happens heads to tail (see {@link - * #closeOperators(StreamTaskActionExecutor)}). + * #finishOperators(StreamTaskActionExecutor)}). */ protected void initializeStateAndOpenOperators( StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { @@ -422,9 +422,9 @@ public class OperatorChain> * operator in the chain, contrary to {@link StreamOperator#open()} which happens tail to * heads (see {@link #initializeStateAndOpenOperators(StreamTaskStateInitializer)}). */ - protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception { + protected void finishOperators(StreamTaskActionExecutor actionExecutor) throws Exception { if (firstOperatorWrapper != null) { - firstOperatorWrapper.close(actionExecutor, ignoreEndOfInput); + firstOperatorWrapper.finish(actionExecutor, ignoreEndOfInput); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java index 401bc89..1b29fbc 100755 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java @@ -39,7 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * This class handles the close, endInput and other related logic of a {@link StreamOperator}. It * also automatically propagates the close operation to the next wrapper that the {@link #next} * points to, so we can use {@link #next} to link all operator wrappers in the operator chain and - * close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean)} method + * close all operators only by calling the {@link #finish(StreamTaskActionExecutor, boolean)} method * of the header operator wrapper. */ @Internal @@ -120,7 +120,7 @@ public class StreamOperatorWrapper> { * MailboxExecutor#yield()} to take the mails of closing operator and running timers and run * them. */ - public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint) + public void finish(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint) throws Exception { if (!isHead && !isStoppingBySyncSavepoint) { // NOTE: This only do for the case where the operator is one-input operator. At present, @@ -128,68 +128,63 @@ public class StreamOperatorWrapper> { actionExecutor.runThrowing(() -> endOperatorInput(1)); } - quiesceTimeServiceAndCloseOperator(actionExecutor); + quiesceTimeServiceAndFinishOperator(actionExecutor); // propagate the close operation to the next wrapper if (next != null) { - next.close(actionExecutor, isStoppingBySyncSavepoint); + next.finish(actionExecutor, isStoppingBySyncSavepoint); } } - private void quiesceTimeServiceAndCloseOperator(StreamTaskActionExecutor actionExecutor) + private void quiesceTimeServiceAndFinishOperator(StreamTaskActionExecutor actionExecutor) throws InterruptedException, ExecutionException { // step 1. to ensure that there is no longer output triggered by the timers before invoking - // the "close()" - // method of the operator, we quiesce the processing time service to prevent the - // pending timers - // from firing, but wait the timers in running to finish - // step 2. invoke the "close()" method of the operator. executing the close operation must - // be deferred - // to the mailbox to ensure that mails already in the mailbox are finished before - // closing the - // operator + // the "finish()" method of the operator, we quiesce the processing time service to prevent + // the pending timers from firing, but wait the timers in running to finish + // step 2. invoke the "finish()" method of the operator. executing the close operation must + // be deferred to the mailbox to ensure that mails already in the mailbox are finished + // before closing the operator // step 3. send a closed mail to ensure that the mails that are from the operator and still - // in the mailbox - // are completed before exiting the following mailbox processing loop - CompletableFuture closedFuture = + // in the mailbox are completed before exiting the following mailbox processing loop + CompletableFuture finishedFuture = quiesceProcessingTimeService() - .thenCompose(unused -> deferCloseOperatorToMailbox(actionExecutor)) - .thenCompose(unused -> sendClosedMail()); + .thenCompose(unused -> deferFinishOperatorToMailbox(actionExecutor)) + .thenCompose(unused -> sendFinishedMail()); // run the mailbox processing loop until all operations are finished - while (!closedFuture.isDone()) { + while (!finishedFuture.isDone()) { while (mailboxExecutor.tryYield()) {} // we wait a little bit to avoid unnecessary CPU occupation due to empty loops, // such as when all mails of the operator have been processed but the closed future // has not been set to completed state try { - closedFuture.get(1, TimeUnit.MILLISECONDS); + finishedFuture.get(1, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { // do nothing } } // expose the exception thrown when closing - closedFuture.get(); + finishedFuture.get(); } - private CompletableFuture deferCloseOperatorToMailbox( + private CompletableFuture deferFinishOperatorToMailbox( StreamTaskActionExecutor actionExecutor) { - final CompletableFuture closeOperatorFuture = new CompletableFuture<>(); + final CompletableFuture finishOperatorFuture = new CompletableFuture<>(); mailboxExecutor.execute( () -> { try { - closeOperator(actionExecutor); - closeOperatorFuture.complete(null); + finishOperator(actionExecutor); + finishOperatorFuture.complete(null); } catch (Throwable t) { - closeOperatorFuture.completeExceptionally(t); + finishOperatorFuture.completeExceptionally(t); } }, "StreamOperatorWrapper#closeOperator for " + wrapped); - return closeOperatorFuture; + return finishOperatorFuture; } private CompletableFuture quiesceProcessingTimeService() { @@ -198,19 +193,20 @@ public class StreamOperatorWrapper> { .orElse(CompletableFuture.completedFuture(null)); } - private CompletableFuture sendClosedMail() { + private CompletableFuture sendFinishedMail() { final CompletableFuture future = new CompletableFuture<>(); mailboxExecutor.execute( - () -> future.complete(null), "StreamOperatorWrapper#sendClosedMail for " + wrapped); + () -> future.complete(null), + "StreamOperatorWrapper#sendFinishedMail for " + wrapped); return future; } - private void closeOperator(StreamTaskActionExecutor actionExecutor) throws Exception { + private void finishOperator(StreamTaskActionExecutor actionExecutor) throws Exception { actionExecutor.runThrowing( () -> { closed = true; - wrapped.close(); + wrapped.finish(); }); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a0326981..8ff3446 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -236,7 +236,7 @@ public abstract class StreamTask> extends Ab */ private volatile boolean failing; - private boolean disposedOperators; + private boolean closedOperators; /** Thread pool for async snapshot workers. */ private final ExecutorService asyncOperationsThreadPool; @@ -506,7 +506,7 @@ public abstract class StreamTask> extends Ab /** * Instructs the task to go through its normal termination routine, i.e. exit the run-loop and - * call {@link StreamOperator#close()} and {@link StreamOperator#dispose()} on its operators. + * call {@link StreamOperator#finish()} and {@link StreamOperator#close()} on its operators. * *

This is used by the source task to get out of the run-loop when the job is stopped with a * savepoint. @@ -552,7 +552,7 @@ public abstract class StreamTask> extends Ab LOG.debug("Re-restore attempt rejected."); return; } - disposedOperators = false; + closedOperators = false; LOG.debug("Initializing {}.", getName()); operatorChain = new OperatorChain<>(this, recordWriter); @@ -698,7 +698,7 @@ public abstract class StreamTask> extends Ab final CompletableFuture timersFinishedFuture = new CompletableFuture<>(); // close all operators in a chain effect way - operatorChain.closeOperators(actionExecutor); + operatorChain.finishOperators(actionExecutor); // If checkpoints are enabled, waits for all the records get processed by the downstream // tasks. During this process, this task could coordinate with its downstream tasks to @@ -758,7 +758,7 @@ public abstract class StreamTask> extends Ab // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail - disposeAllOperators(); + closeAllOperators(); } protected void cleanUpInvoke() throws Exception { @@ -789,8 +789,7 @@ public abstract class StreamTask> extends Ab suppressedException = runAndSuppressThrowable(this::cleanup, suppressedException); // if the operators were not disposed before, do a hard dispose - suppressedException = - runAndSuppressThrowable(this::disposeAllOperators, suppressedException); + suppressedException = runAndSuppressThrowable(this::closeAllOperators, suppressedException); // release the output resources. this method should never fail. suppressedException = @@ -888,24 +887,24 @@ public abstract class StreamTask> extends Ab } /** - * Execute @link StreamOperator#dispose()} of each operator in the chain of this {@link - * StreamTask}. Disposing happens from tail to head operator in the chain. + * Execute {@link StreamOperator#close()} of each operator in the chain of this {@link + * StreamTask}. Closing happens from tail to head operator in the chain. */ - private void disposeAllOperators() throws Exception { - if (operatorChain != null && !disposedOperators) { - Exception disposalException = null; + private void closeAllOperators() throws Exception { + if (operatorChain != null && !closedOperators) { + Exception closingException = null; for (StreamOperatorWrapper operatorWrapper : operatorChain.getAllOperators(true)) { StreamOperator operator = operatorWrapper.getStreamOperator(); try { - operator.dispose(); + operator.close(); } catch (Exception e) { - disposalException = firstOrSuppressed(e, disposalException); + closingException = firstOrSuppressed(e, closingException); } } - disposedOperators = true; - if (disposalException != null) { - throw disposalException; + closedOperators = true; + if (closingException != null) { + throw closingException; } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index 8697353..6daaee8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -67,9 +67,9 @@ public class AbstractUdfStreamOperatorLifecycleTest { "UDF::run", "OPERATOR::prepareSnapshotPreBarrier", "OPERATOR::snapshotState", + "OPERATOR::finish", "OPERATOR::close", - "UDF::close", - "OPERATOR::dispose"); + "UDF::close"); private static final List EXPECTED_CALL_ORDER_CANCEL_RUNNING = Arrays.asList( @@ -82,13 +82,13 @@ public class AbstractUdfStreamOperatorLifecycleTest { "UDF::run", "OPERATOR::cancel", "UDF::cancel", - "OPERATOR::dispose", + "OPERATOR::close", "UDF::close"); private static final String ALL_METHODS_STREAM_OPERATOR = "[" + "close[], " - + "dispose[], " + + "finish[], " + "getCurrentKey[], " + "getMetricGroup[], " + "getOperatorID[], " @@ -320,6 +320,12 @@ public class AbstractUdfStreamOperatorLifecycleTest { } @Override + public void finish() throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::finish"); + super.finish(); + } + + @Override public void close() throws Exception { ACTUAL_ORDER_TRACKING.add("OPERATOR::close"); super.close(); @@ -330,14 +336,5 @@ public class AbstractUdfStreamOperatorLifecycleTest { ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel"); super.cancel(); } - - @Override - public void dispose() throws Exception { - ACTUAL_ORDER_TRACKING.add("OPERATOR::dispose"); - super.dispose(); - if (simulateCheckpointing) { - testCheckpointer.join(); - } - } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 7c835f0..a518c33 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -95,7 +95,6 @@ public class SourceOperatorTest { @After public void cleanUp() throws Exception { operator.close(); - operator.dispose(); assertTrue(mockSourceReader.isClosed()); } @@ -187,17 +186,6 @@ public class SourceOperatorTest { assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0)); } - @Test - public void testDisposeAfterCloseOnlyClosesReaderOnce() throws Exception { - // Initialize the operator. - operator.initializeState(getStateContext()); - // Open the operator. - operator.open(); - operator.close(); - operator.dispose(); - assertEquals(1, mockSourceReader.getTimesClosed()); - } - // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java index 64405d4..70a8bb7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -294,7 +294,7 @@ public class ContinuousFileProcessingRescalingTest { while (!getFormat().isLastProcessed()) { mailboxProcessor.runMailboxStep(); } - getHarness().close(); + harness.getOperator().finish(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java index 597d7fb..3e80b1a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java @@ -190,7 +190,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends TestLogger { new MockEnvironmentBuilder().build())); try { operator.run(new Object(), new CollectorOutput<>(output), operatorChain); - operator.close(); + operator.finish(); } finally { operatorChain.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 71be94f..5be8ae25 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -73,7 +73,6 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator; -import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.SerializedValue; @@ -473,10 +472,11 @@ public class MultipleInputStreamTaskTest { LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT, LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT, LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT, - LifeCycleTrackingMockSourceReader.CLOSE, - LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE, + LifeCycleTrackingMapToStringMultipleInputOperator.FINISH, LifeCycleTrackingMap.END_INPUT, - LifeCycleTrackingMap.CLOSE)); + LifeCycleTrackingMap.CLOSE, + LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE, + LifeCycleTrackingMockSourceReader.CLOSE)); } @Test @@ -999,35 +999,6 @@ public class MultipleInputStreamTaskTest { } } - private static class TestBoundedMultipleInputOperatorFactory - extends AbstractStreamOperatorFactory { - @Override - public > T createStreamOperator( - StreamOperatorParameters parameters) { - return (T) new TestBoundedMultipleInputOperator("Operator0", parameters); - } - - @Override - public Class> getStreamOperatorClass( - ClassLoader classLoader) { - return TestBoundedMultipleInputOperator.class; - } - } - - private static class DuplicatingOperatorFactory extends AbstractStreamOperatorFactory { - @Override - public > T createStreamOperator( - StreamOperatorParameters parameters) { - return (T) new DuplicatingOperator(parameters); - } - - @Override - public Class> getStreamOperatorClass( - ClassLoader classLoader) { - return DuplicatingOperator.class; - } - } - /** Factory for {@link MapToStringMultipleInputOperator}. */ protected static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory { @@ -1138,6 +1109,7 @@ public class MultipleInputStreamTaskTest { extends MapToStringMultipleInputOperator implements BoundedMultiInput { public static final String OPEN = "MultipleInputOperator#open"; public static final String CLOSE = "MultipleInputOperator#close"; + public static final String FINISH = "MultipleInputOperator#finish"; public static final String END_INPUT = "MultipleInputOperator#endInput"; private static final long serialVersionUID = 1L; @@ -1163,6 +1135,11 @@ public class MultipleInputStreamTaskTest { public void endInput(int inputId) { LIFE_CYCLE_EVENTS.add(END_INPUT); } + + @Override + public void finish() throws Exception { + LIFE_CYCLE_EVENTS.add(FINISH); + } } static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index c845e1d..c5c653b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -714,9 +714,11 @@ public class OneInputStreamTaskTest extends TestLogger { expected, new StreamRecord<>("Hello"), new StreamRecord<>("[Operator0]: End of input"), - new StreamRecord<>("[Operator0]: Bye"), + new StreamRecord<>("[Operator0]: Finish"), new StreamRecord<>("[Operator1]: End of input"), - new StreamRecord<>("[Operator1]: Bye")); + new StreamRecord<>("[Operator1]: Finish"), + new StreamRecord<>("[Operator1]: Bye"), + new StreamRecord<>("[Operator0]: Bye")); final Object[] output = testHarness.getOutput().toArray(); assertArrayEquals("Output was not correct.", expected.toArray(), output); @@ -733,7 +735,7 @@ public class OneInputStreamTaskTest extends TestLogger { } @Override - public void close() throws Exception { + public void finish() throws Exception { // verify that the timer service is still running Assert.assertTrue( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 607d848..287f86f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -267,9 +267,11 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { expected, new StreamRecord<>("Hello"), new StreamRecord<>("[Source0]: End of input"), - new StreamRecord<>("[Source0]: Bye"), + new StreamRecord<>("[Source0]: Finish"), new StreamRecord<>("[Operator1]: End of input"), - new StreamRecord<>("[Operator1]: Bye")); + new StreamRecord<>("[Operator1]: Finish"), + new StreamRecord<>("[Operator1]: Bye"), + new StreamRecord<>("[Source0]: Bye")); final Object[] output = testHarness.getOutput().toArray(); assertArrayEquals("Output was not correct.", expected.toArray(), output); @@ -311,6 +313,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } expectedOutput.add(new StreamRecord<>("Hello")); + expectedOutput.add(new StreamRecord<>("[Operator1]: Bye")); TestHarnessUtil.assertOutputEquals( "Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -919,12 +922,18 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase { } @Override - public void close() throws Exception { + public void finish() throws Exception { ProcessingTimeService timeService = getProcessingTimeService(); timeService.registerTimer( timeService.getCurrentProcessingTime(), t -> output("[" + name + "]: Timer registered in close")); + output("[" + name + "]: Finish"); + super.finish(); + } + + @Override + public void close() throws Exception { output("[" + name + "]: Bye"); super.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java index 640efe0..7725f3e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java @@ -131,9 +131,9 @@ public class StreamOperatorWrapperTest extends TestLogger { } @Test - public void testClose() throws Exception { + public void testFinish() throws Exception { output.clear(); - operatorWrappers.get(0).close(containingTask.getActionExecutor(), false); + operatorWrappers.get(0).finish(containingTask.getActionExecutor(), false); List expected = new ArrayList<>(); for (int i = 0; i < operatorWrappers.size(); i++) { @@ -143,7 +143,7 @@ public class StreamOperatorWrapperTest extends TestLogger { prefix + ": End of input", prefix + ": Timer that was in mailbox before closing operator", prefix + ": Bye", - prefix + ": Mail to put in mailbox when closing operator"); + prefix + ": Mail to put in mailbox when finishing operator"); } assertArrayEquals( @@ -153,12 +153,12 @@ public class StreamOperatorWrapperTest extends TestLogger { } @Test - public void testClosingOperatorWithException() { - AbstractStreamOperator streamOperator = + public void testFinishingOperatorWithException() { + AbstractStreamOperator streamOperator = new AbstractStreamOperator() { @Override - public void close() throws Exception { - throw new Exception("test exception at closing"); + public void finish() throws Exception { + throw new Exception("test exception at finishing"); } }; @@ -172,11 +172,11 @@ public class StreamOperatorWrapperTest extends TestLogger { true); try { - operatorWrapper.close(containingTask.getActionExecutor(), false); + operatorWrapper.finish(containingTask.getActionExecutor(), false); fail("should throw an exception"); } catch (Throwable t) { Optional optional = - ExceptionUtils.findThrowableWithMessage(t, "test exception at closing"); + ExceptionUtils.findThrowableWithMessage(t, "test exception at finishing"); assertTrue(optional.isPresent()); } } @@ -313,20 +313,22 @@ public class StreamOperatorWrapperTest extends TestLogger { } @Override - public void close() throws Exception { + public void finish() throws Exception { ProcessingTimeCallback callback = t1 -> output.add( "[" + name - + "]: Timer to put in mailbox when closing operator"); + + "]: Timer to put in mailbox when finishing operator"); assertNotNull(processingTimeService.registerTimer(0, callback)); assertNull(timerMailController.getPuttingLatch(callback)); mailboxExecutor.submit( () -> output.add( - "[" + name + "]: Mail to put in mailbox when closing operator"), + "[" + + name + + "]: Mail to put in mailbox when finishing operator"), ""); output.add("[" + name + "]: Bye"); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index f2123b7..78356a8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -371,7 +371,7 @@ public class StreamTaskTest extends TestLogger { throw ex; } if (!(ex.getCause().getSuppressed()[0] - instanceof FailingTwiceOperator.DisposeException)) { + instanceof FailingTwiceOperator.CloseException)) { throw ex; } } @@ -387,13 +387,13 @@ public class StreamTaskTest extends TestLogger { } @Override - public void dispose() throws Exception { - throw new DisposeException(); + public void close() throws Exception { + throw new CloseException(); } - class DisposeException extends Exception { - public DisposeException() { - super("Dispose Exception. This exception should be suppressed"); + static class CloseException extends Exception { + public CloseException() { + super("Close Exception. This exception should be suppressed"); } } } @@ -1143,10 +1143,10 @@ public class StreamTaskTest extends TestLogger { */ @Test public void testOperatorClosingBeforeStopRunning() throws Throwable { - BlockingCloseStreamOperator.resetLatches(); + BlockingFinishStreamOperator.resetLatches(); Configuration taskConfiguration = new Configuration(); StreamConfig streamConfig = new StreamConfig(taskConfiguration); - streamConfig.setStreamOperator(new BlockingCloseStreamOperator()); + streamConfig.setStreamOperator(new BlockingFinishStreamOperator()); streamConfig.setOperatorID(new OperatorID()); try (MockEnvironment mockEnvironment = @@ -1158,16 +1158,16 @@ public class StreamTaskTest extends TestLogger { .setTaskConfiguration(taskConfiguration) .build()) { - RunningTask> task = + RunningTask> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment)); - BlockingCloseStreamOperator.inClose.await(); + BlockingFinishStreamOperator.inClose.await(); // check that the StreamTask is not yet in isRunning == false assertTrue(task.streamTask.isRunning()); // let the operator finish its close operation - BlockingCloseStreamOperator.finishClose.trigger(); + BlockingFinishStreamOperator.finishClose.trigger(); task.waitForTaskCompletion(false); @@ -1184,7 +1184,7 @@ public class StreamTaskTest extends TestLogger { */ @Test public void testNotifyCheckpointOnClosedOperator() throws Throwable { - ClosingOperator operator = new ClosingOperator(); + ClosingOperator operator = new ClosingOperator<>(); StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) @@ -1198,15 +1198,15 @@ public class StreamTaskTest extends TestLogger { harness.streamTask.notifyCheckpointCompleteAsync(1); harness.streamTask.runMailboxStep(); - assertEquals(1, operator.notified.get()); - assertEquals(false, operator.closed.get()); + assertEquals(1, ClosingOperator.notified.get()); + assertFalse(ClosingOperator.closed.get()); // close operators directly, so that task is still fully running - harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor()); + harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor()); harness.streamTask.notifyCheckpointCompleteAsync(2); harness.streamTask.runMailboxStep(); - assertEquals(1, operator.notified.get()); - assertEquals(true, operator.closed.get()); + assertEquals(1, ClosingOperator.notified.get()); + assertTrue(ClosingOperator.closed.get()); } @Test @@ -1247,7 +1247,7 @@ public class StreamTaskTest extends TestLogger { */ @Test public void testCheckpointDeclinedOnClosedOperator() throws Throwable { - ClosingOperator operator = new ClosingOperator(); + ClosingOperator operator = new ClosingOperator<>(); StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) @@ -1258,8 +1258,8 @@ public class StreamTaskTest extends TestLogger { harness.setAutoProcess(false); harness.processElement(new StreamRecord<>(1)); - harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor()); - assertEquals(true, operator.closed.get()); + harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor()); + assertTrue(ClosingOperator.closed.get()); harness.streamTask.triggerCheckpointOnBarrier( new CheckpointMetaData(1, 0), @@ -2057,14 +2057,14 @@ public class StreamTaskTest extends TestLogger { } } - private static class BlockingCloseStreamOperator extends AbstractStreamOperator { + private static class BlockingFinishStreamOperator extends AbstractStreamOperator { private static final long serialVersionUID = -9042150529568008847L; private static volatile OneShotLatch inClose; private static volatile OneShotLatch finishClose; @Override - public void close() throws Exception { + public void finish() throws Exception { checkLatches(); inClose.trigger(); finishClose.await(); @@ -2673,7 +2673,7 @@ public class StreamTaskTest extends TestLogger { } @Override - public void close() throws Exception { + public void finish() throws Exception { super.close(); closed.set(true); } @@ -2769,7 +2769,7 @@ public class StreamTaskTest extends TestLogger { } @Override - public void dispose() throws Exception { + public void close() throws Exception { wasClosed = true; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index de9c703..f023e10 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -547,10 +547,10 @@ public class SubtaskCheckpointCoordinatorTest { public void open() throws Exception {} @Override - public void close() throws Exception {} + public void finish() throws Exception {} @Override - public void dispose() {} + public void close() throws Exception {} @Override public void prepareSnapshotPreBarrier(long checkpointId) {} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java index b5f523d..05673c0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java @@ -53,12 +53,18 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator output("[" + name + "]: Timer registered in close")); + t -> output("[" + name + "]: Timer registered in finish")); + + output("[" + name + "]: Finish"); + super.finish(); + } + @Override + public void close() throws Exception { output("[" + name + "]: Bye"); super.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java index 5edd4fe..8dd49d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedTwoInputOperator.java @@ -51,14 +51,19 @@ public class TestBoundedTwoInputOperator extends AbstractStreamOperator } @Override - public void close() throws Exception { + public void finish() throws Exception { ProcessingTimeService timeService = getProcessingTimeService(); timeService.registerTimer( timeService.getCurrentProcessingTime(), t -> output("[" + name + "]: Timer registered in close")); - output.collect(new StreamRecord<>("[" + name + "]: Bye")); - super.close(); + output.collect(new StreamRecord<>("[" + name + "]: Finish")); + super.finish(); + } + + @Override + public void close() throws Exception { + output("[" + name + "]: Bye"); } private void output(String record) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 6e1fb62..d1ca9fc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -710,9 +710,11 @@ public class TwoInputStreamTaskTest { new StreamRecord<>("[Operator0-1]: End of input"), new StreamRecord<>("[Operator0-2]: Hello-2"), new StreamRecord<>("[Operator0-2]: End of input"), - new StreamRecord<>("[Operator0]: Bye"), + new StreamRecord<>("[Operator0]: Finish"), new StreamRecord<>("[Operator1]: End of input"), - new StreamRecord<>("[Operator1]: Bye")); + new StreamRecord<>("[Operator1]: Finish"), + new StreamRecord<>("[Operator1]: Bye"), + new StreamRecord<>("[Operator0]: Bye")); final Object[] output = testHarness.getOutput().toArray(); assertArrayEquals("Output was not correct.", expected.toArray(), output); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 3663052..95ad25e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -686,14 +686,14 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { operator.notifyCheckpointComplete(checkpointId); } - /** Calls close and dispose on the operator. */ + /** Calls finish and close on the operator. */ public void close() throws Exception { - operator.close(); - operator.dispose(); if (processingTimeService != null) { processingTimeService.shutdownService(); } setupCalled = false; + operator.finish(); + operator.close(); if (internalEnvironment.isPresent()) { internalEnvironment.get().close(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java deleted file mode 100644 index 6db4adf..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.streaming.api.operators.AbstractInput; -import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; -import org.apache.flink.streaming.api.operators.BoundedMultiInput; -import org.apache.flink.streaming.api.operators.Input; -import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.Arrays; -import java.util.List; - -/** A test operator class implementing {@link BoundedMultiInput}. */ -public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2 - implements MultipleInputStreamOperator, BoundedMultiInput { - - private static final long serialVersionUID = 1L; - - private final String name; - - public TestBoundedMultipleInputOperator( - String name, StreamOperatorParameters parameters) { - super(parameters, 3); - this.name = name; - } - - @Override - public List getInputs() { - return Arrays.asList( - new TestInput(this, 1), new TestInput(this, 2), new TestInput(this, 3)); - } - - @Override - public void endInput(int inputId) { - output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: End of input")); - } - - @Override - public void close() throws Exception { - output.collect(new StreamRecord<>("[" + name + "]: Bye")); - super.close(); - } - - class TestInput extends AbstractInput { - public TestInput(AbstractStreamOperatorV2 owner, int inputId) { - super(owner, inputId); - } - - @Override - public void processElement(StreamRecord element) throws Exception { - output.collect( - element.replace("[" + name + "-" + inputId + "]: " + element.getValue())); - } - } -} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java index 73f6f1f..2a96c62 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java @@ -159,8 +159,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe } @Override - public void dispose() throws Exception { - super.dispose(); + public void close() throws Exception { + super.close(); if (helper != null) { helper.close(); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java index b94da3d..ef67090 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java @@ -40,8 +40,6 @@ public abstract class TableStreamOperator extends AbstractStreamOperator extends AbstractStreamOperator collector; - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - /** current watermark of this operator. */ private transient long currentWatermark; @@ -94,7 +91,6 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator(output); collector.eraseTimestamp(); @@ -142,22 +138,11 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator extends AbstractS } @Override + public void finish() throws Exception { + finishBundle(); + super.finish(); + } + + @Override public void close() throws Exception { + Exception exception = null; + try { - finishBundle(); - } finally { - Exception exception = null; - - try { - super.close(); - if (function != null) { - FunctionUtils.closeFunction(function); - } - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + super.close(); + if (function != null) { + FunctionUtils.closeFunction(function); } + } catch (InterruptedException interrupted) { + exception = interrupted; - if (exception != null) { - LOG.warn("Errors occurred while closing the BundleOperator.", exception); - } + Thread.currentThread().interrupt(); + } catch (Exception e) { + exception = e; + } + + if (exception != null) { + LOG.warn("Errors occurred while closing the BundleOperator.", exception); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java index a874922..3d3b560 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java @@ -134,6 +134,7 @@ public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorW @Override public void close() throws Exception { FunctionUtils.closeFunction(joinCondition); + super.close(); } /** diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java index bdd7cac..63ae148 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java @@ -220,6 +220,7 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS if (joinCondition != null) { joinCondition.close(); } + super.close(); } /** diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java index d51064f..66a5de2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -92,9 +92,6 @@ public abstract class WindowJoinOperator extends TableStreamOperator private final boolean[] filterNullKeys; private final ZoneId shiftTimeZone; - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - private transient WindowTimerService windowTimerService; // ------------------------------------------------------------------------ @@ -136,7 +133,6 @@ public abstract class WindowJoinOperator extends TableStreamOperator @Override public void open() throws Exception { super.open(); - functionsClosed = false; this.collector = new TimestampedCollector<>(output); collector.eraseTimestamp(); @@ -197,25 +193,12 @@ public abstract class WindowJoinOperator extends TableStreamOperator public void close() throws Exception { super.close(); collector = null; - functionsClosed = true; if (joinCondition != null) { joinCondition.close(); } } @Override - public void dispose() throws Exception { - super.dispose(); - collector = null; - if (!functionsClosed) { - functionsClosed = true; - if (joinCondition != null) { - joinCondition.close(); - } - } - } - - @Override public void processElement1(StreamRecord element) throws Exception { processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java index 32863f8..ba28e51 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java @@ -130,29 +130,29 @@ public abstract class MultipleInputStreamOperatorBase extends AbstractStreamOper } /** - * Closes all sub-operators in a multiple input operator effect way. Closing happens from + * Finish all sub-operators in a multiple input operator effect way. Finishing happens from * head to tail sub-operator in a multiple input operator, contrary to {@link * StreamOperator#open()} which happens tail to head. */ @Override - public void close() throws Exception { - super.close(); + public void finish() throws Exception { + super.finish(); for (TableOperatorWrapper wrapper : topologicalOrderingOperators) { - wrapper.close(); + StreamOperator operator = wrapper.getStreamOperator(); + operator.finish(); } } /** - * Dispose all sub-operators in a multiple input operator effect way. Disposing happens from + * Closes all sub-operators in a multiple input operator effect way. Closing happens from * head to tail sub-operator in a multiple input operator, contrary to {@link * StreamOperator#open()} which happens tail to head. */ @Override - public void dispose() throws Exception { - super.dispose(); + public void close() throws Exception { + super.close(); for (TableOperatorWrapper wrapper : topologicalOrderingOperators) { - StreamOperator operator = wrapper.getStreamOperator(); - operator.dispose(); + wrapper.close(); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java index 642c03f..133f6e5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java @@ -134,8 +134,8 @@ public class StreamSortOperator extends TableStreamOperator } @Override - public void close() throws Exception { - LOG.info("Closing StreamSortOperator"); + public void finish() throws Exception { + LOG.info("Finishing StreamSortOperator"); // BoundedOneInput can not coexistence with checkpoint, so we emit output in close. if (!inputBuffer.isEmpty()) { @@ -153,6 +153,6 @@ public class StreamSortOperator extends TableStreamOperator } }); } - super.close(); + super.finish(); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java index 77344b7..c2b5b29 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java @@ -154,9 +154,6 @@ public abstract class WindowOperator extends AbstractStream /** This is used for emitting elements with a given timestamp. */ protected transient TimestampedCollector collector; - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - private transient InternalTimerService internalTimerService; private transient InternalValueState windowState; @@ -242,8 +239,6 @@ public abstract class WindowOperator extends AbstractStream public void open() throws Exception { super.open(); - functionsClosed = false; - collector = new TimestampedCollector<>(output); collector.eraseTimestamp(); @@ -318,26 +313,12 @@ public abstract class WindowOperator extends AbstractStream super.close(); collector = null; triggerContext = null; - functionsClosed = true; if (windowAggregator != null) { windowAggregator.close(); } } @Override - public void dispose() throws Exception { - super.dispose(); - collector = null; - triggerContext = null; - if (!functionsClosed) { - functionsClosed = true; - if (windowAggregator != null) { - windowAggregator.close(); - } - } - } - - @Override public void processElement(StreamRecord record) throws Exception { RowData inputRow = record.getValue(); long timestamp; @@ -407,10 +388,6 @@ public abstract class WindowOperator extends AbstractStream @Override public void onProcessingTime(InternalTimer timer) throws Exception { - if (functionsClosed) { - return; - } - setCurrentKey(timer.getKey()); triggerContext.window = timer.getNamespace(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java index 516f09b..1e2e093 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java @@ -110,9 +110,6 @@ public final class SlicingWindowOperator extends TableStreamOperator collector; - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - /** The service to register timers. */ private transient InternalTimerService internalTimerService; @@ -135,7 +132,6 @@ public final class SlicingWindowOperator extends TableStreamOperator(output); @@ -178,21 +174,10 @@ public final class SlicingWindowOperator extends TableStreamOperator element) throws Exception { RowData inputRow = element.getValue(); RowData currentKey = (RowData) getCurrentKey(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java index 69d9bf3..8ae817e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java @@ -106,9 +106,4 @@ public class ProcTimeMiniBatchAssignerOperator extends AbstractStreamOperator private transient long lastRecordTime; - /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ - private transient boolean functionsClosed = false; - private transient StreamStatus currentStatus = StreamStatus.ACTIVE; /** @@ -183,20 +180,14 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator } @Override - public void close() throws Exception { + public void finish() throws Exception { // all records have been processed, emit a final watermark processWatermark(Watermark.MAX_WATERMARK); - - functionsClosed = true; - FunctionUtils.closeFunction(watermarkGenerator); } @Override - public void dispose() throws Exception { - super.dispose(); - if (!functionsClosed) { - functionsClosed = true; - FunctionUtils.closeFunction(watermarkGenerator); - } + public void close() throws Exception { + FunctionUtils.closeFunction(watermarkGenerator); + super.close(); } } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java index ee24464..d9dde71 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.java @@ -133,37 +133,6 @@ public class BatchMultipleInputStreamOperatorTest extends MultipleInputTestBase } @Test - public void testDispose() throws Exception { - TestingBatchMultipleInputStreamOperator op = createMultipleInputStreamOperator(); - TestingTwoInputStreamOperator joinOp2 = - (TestingTwoInputStreamOperator) op.getTailWrapper().getStreamOperator(); - - TableOperatorWrapper joinWrapper1 = op.getTailWrapper().getInputWrappers().get(0); - TestingTwoInputStreamOperator joinOp1 = - (TestingTwoInputStreamOperator) joinWrapper1.getStreamOperator(); - - TableOperatorWrapper aggWrapper1 = joinWrapper1.getInputWrappers().get(0); - TestingOneInputStreamOperator aggOp1 = - (TestingOneInputStreamOperator) aggWrapper1.getStreamOperator(); - - TableOperatorWrapper aggWrapper2 = joinWrapper1.getInputWrappers().get(1); - TestingOneInputStreamOperator aggOp2 = - (TestingOneInputStreamOperator) aggWrapper2.getStreamOperator(); - - assertFalse(aggOp1.isDisposed()); - assertFalse(aggOp2.isDisposed()); - assertFalse(aggOp1.isDisposed()); - assertFalse(joinOp2.isDisposed()); - - op.dispose(); - - assertTrue(aggOp1.isDisposed()); - assertTrue(aggOp2.isDisposed()); - assertTrue(joinOp1.isDisposed()); - assertTrue(joinOp2.isDisposed()); - } - - @Test public void testClose() throws Exception { TestingBatchMultipleInputStreamOperator op = createMultipleInputStreamOperator(); TestingTwoInputStreamOperator joinOp2 = diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java index efec9a9..c570db9 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestingOneInputStreamOperator.java @@ -41,7 +41,6 @@ public class TestingOneInputStreamOperator extends AbstractStreamOperator> receivedElements = new ArrayList<>(); @@ -91,11 +90,6 @@ public class TestingOneInputStreamOperator extends AbstractStreamOperator endInputs = new ArrayList<>(); - private boolean isDisposed = false; private boolean isClosed = false; public TestingTwoInputStreamOperator() { @@ -118,11 +117,6 @@ public class TestingTwoInputStreamOperator extends AbstractStreamOperator