From issues-return-401807-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Sep 25 13:32:42 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id C37D4180636 for ; Fri, 25 Sep 2020 15:32:42 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 10FF4449FA for ; Fri, 25 Sep 2020 13:32:42 +0000 (UTC) Received: (qmail 32375 invoked by uid 500); 25 Sep 2020 13:32:41 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 32366 invoked by uid 99); 25 Sep 2020 13:32:41 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2020 13:32:41 +0000 From: =?utf-8?q?GitBox?= To: issues@flink.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bflink=5D_pnowojski_commented_on_a_change_in_pull_r?= =?utf-8?q?equest_=2313423=3A_=5BFLINK-19269=5D_Make_the_PushingAsyncDataInp?= =?utf-8?q?ut=2EDataOutput_aware_of_endOfInput?= Message-ID: <160104076166.32230.3894677393329494400.asfpy@gitbox.apache.org> Date: Fri, 25 Sep 2020 13:32:41 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit In-Reply-To: References: pnowojski commented on a change in pull request #13423: URL: https://github.com/apache/flink/pull/13423#discussion_r494110420 ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ########## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: I think this could lead to more confusion and bugs. It would be better to keep it simple and not support it. `endInput()` in your example above someone could understand as ending all inputs. Also as always. Keeping it simpler now, means we can always extend it in the future if we change our mind. But if we decide to support some strange combination now "just because we can", backing out from it won't be possible without braking compatibility. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ########## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: You are right, it's currently an undefined behaviour. Imo all of the above should be disallowed. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ########## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: You are right, it's currently an undefined behaviour. Imo all of the above should be disallowed. And at the very least, they were implicitly disallowed by the runtime, vide no test coverage for those combinations. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ########## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: You are right, it's currently an undefined behaviour. Imo all of the above should be disallowed. And at the very least, they were implicitly disallowed while we were implementing bounded inputs, vide no test coverage for those combinations. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org