flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [7/7] flink git commit: [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent
Date Wed, 28 Feb 2018 16:16:02 GMT
[FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent

Because of race condition between:
  1. releasing inputChannelsWithData lock in this method and reaching this place
  2. empty data notification that re-enqueues a channel
we can end up with moreAvailable flag set to true, while we expect no more data.

This commit detects such situation, makes a correct assertion and turn off moreAvailable flag.

This closes #5588.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9b7416f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9b7416f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9b7416f

Branch: refs/heads/master
Commit: b9b7416f4d6a708d22029a5e971af5b1f67e3296
Parents: 767027f
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Tue Feb 27 10:39:00 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Wed Feb 28 17:13:24 2018 +0100

----------------------------------------------------------------------
 .../runtime/io/network/partition/consumer/SingleInputGate.java | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9b7416f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index be4035c..b9091b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -554,6 +554,12 @@ public class SingleInputGate implements InputGate {
 				channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
 
 				if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+					// Because of race condition between:
+					// 1. releasing inputChannelsWithData lock in this method and reaching this place
+					// 2. empty data notification that re-enqueues a channel
+					// we can end up with moreAvailable flag set to true, while we expect no more data.
+					checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent());
+					moreAvailable = false;
 					hasReceivedAllEndOfPartitionEvents = true;
 				}
 


Mime
View raw message