flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] pnowojski commented on a change in pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput
Date Fri, 25 Sep 2020 13:32:41 GMT

pnowojski commented on a change in pull request #13423:
URL: https://github.com/apache/flink/pull/13423#discussion_r494110420



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception
{
 	public void setKeyContextElement(StreamRecord record) throws Exception {
 		owner.internalSetKeyContextElement(record, stateKeySelector);
 	}
+
+	@Override
+	public void endInput() throws Exception {
+		if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       I think this could lead to more confusion and bugs. It would be better to keep it simple
and not support it. `endInput()` in your example above someone could understand as ending
all inputs. 
   
   Also as always. Keeping it simpler now, means we can always extend it in the future if
we change our mind. But if we decide to support some  strange combination now "just because
we can", backing out from it won't be possible without braking compatibility. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception
{
 	public void setKeyContextElement(StreamRecord record) throws Exception {
 		owner.internalSetKeyContextElement(record, stateKeySelector);
 	}
+
+	@Override
+	public void endInput() throws Exception {
+		if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       You are right, it's currently an undefined behaviour. Imo all of the above should be
disallowed.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception
{
 	public void setKeyContextElement(StreamRecord record) throws Exception {
 		owner.internalSetKeyContextElement(record, stateKeySelector);
 	}
+
+	@Override
+	public void endInput() throws Exception {
+		if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       You are right, it's currently an undefined behaviour. Imo all of the above should be
disallowed. And at the very least, they were implicitly disallowed by the runtime, vide no
test coverage for those combinations.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception
{
 	public void setKeyContextElement(StreamRecord record) throws Exception {
 		owner.internalSetKeyContextElement(record, stateKeySelector);
 	}
+
+	@Override
+	public void endInput() throws Exception {
+		if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       You are right, it's currently an undefined behaviour. Imo all of the above should be
disallowed. And at the very least, they were implicitly disallowed while we were implementing
bounded inputs, vide no test coverage for those combinations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message