flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [5/7] flink git commit: [FLINK-8694][runtime] Fix notifyDataAvailable race condition
Date Wed, 28 Feb 2018 16:17:51 GMT
[FLINK-8694][runtime] Fix notifyDataAvailable race condition

Before there was a race condition that might resulted in igonoring some notifyDataAvailable
calls.
This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress
test
for flushAlways (without this fix this test is dead locking).

(cherry picked from commit ebd39f3)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb6a307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb6a307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb6a307

Branch: refs/heads/release-1.5
Commit: 8eb6a30798c09d171e3eb8019b53e677252bd5ba
Parents: 8e62f90
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Fri Feb 23 11:28:20 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Wed Feb 28 17:17:02 2018 +0100

----------------------------------------------------------------------
 .../CreditBasedSequenceNumberingViewReader.java | 10 +---
 .../netty/SequenceNumberingViewReader.java      |  7 +--
 .../partition/PipelinedSubpartition.java        | 37 ++++++++++++--
 .../partition/PipelinedSubpartitionView.java    |  5 ++
 .../partition/ResultSubpartitionView.java       |  2 +
 .../partition/SpillableSubpartition.java        |  1 -
 .../partition/SpillableSubpartitionView.java    | 28 ++++++++---
 .../partition/SpilledSubpartitionView.java      |  8 +++
 .../network/buffer/BufferBuilderTestUtils.java  |  4 ++
 .../netty/CancelPartitionRequestTest.java       |  5 ++
 .../netty/PartitionRequestQueueTest.java        | 26 ++++++++--
 .../partition/PipelinedSubpartitionTest.java    | 53 ++++++++++++++++++++
 .../partition/SpillableSubpartitionTest.java    |  9 ++--
 .../network/partition/SubpartitionTestBase.java |  5 ++
 .../StreamNetworkThroughputBenchmarkTests.java  |  8 +++
 15 files changed, 173 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
index d02b2bf..9acbbac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the new network credit-based mode.
@@ -44,8 +43,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 
 	private final InputChannelID receiverId;
 
-	private final AtomicBoolean buffersAvailable = new AtomicBoolean();
-
 	private final PartitionRequestQueue requestQueue;
 
 	private volatile ResultSubpartitionView subpartitionView;
@@ -118,7 +115,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 	@Override
 	public boolean isAvailable() {
 		// BEWARE: this must be in sync with #isAvailable()!
-		return buffersAvailable.get() &&
+		return hasBuffersAvailable() &&
 			(numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent());
 	}
 
@@ -154,14 +151,13 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 
 	@VisibleForTesting
 	boolean hasBuffersAvailable() {
-		return buffersAvailable.get();
+		return subpartitionView.isAvailable();
 	}
 
 	@Override
 	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
 		BufferAndBacklog next = subpartitionView.getNextBuffer();
 		if (next != null) {
-			buffersAvailable.set(next.isMoreAvailable());
 			sequenceNumber++;
 
 			if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
@@ -197,7 +193,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 
 	@Override
 	public void notifyDataAvailable() {
-		buffersAvailable.set(true);
 		requestQueue.notifyReaderNonEmpty(this);
 	}
 
@@ -206,7 +201,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 		return "CreditBasedSequenceNumberingViewReader{" +
 			"requestLock=" + requestLock +
 			", receiverId=" + receiverId +
-			", buffersAvailable=" + buffersAvailable.get() +
 			", sequenceNumber=" + sequenceNumber +
 			", numCreditsAvailable=" + numCreditsAvailable +
 			", isRegisteredAsAvailable=" + isRegisteredAsAvailable +

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 2d9635c..6a83af1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the old network mode.
@@ -43,8 +42,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener,
Network
 
 	private final InputChannelID receiverId;
 
-	private final AtomicBoolean buffersAvailable = new AtomicBoolean();
-
 	private final PartitionRequestQueue requestQueue;
 
 	private volatile ResultSubpartitionView subpartitionView;
@@ -96,7 +93,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener,
Network
 
 	@Override
 	public boolean isAvailable() {
-		return buffersAvailable.get();
+		return subpartitionView.isAvailable();
 	}
 
 	@Override
@@ -113,7 +110,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener,
Network
 	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
 		BufferAndBacklog next = subpartitionView.getNextBuffer();
 		if (next != null) {
-			buffersAvailable.set(next.isMoreAvailable());
 			sequenceNumber++;
 			return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog());
 		} else {
@@ -143,7 +139,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener,
Network
 
 	@Override
 	public void notifyDataAvailable() {
-		buffersAvailable.set(true);
 		requestQueue.notifyReaderNonEmpty(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index a9c6e57..cc79363 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 
@@ -48,6 +49,9 @@ class PipelinedSubpartition extends ResultSubpartition {
 	/** Flag indicating whether the subpartition has been finished. */
 	private boolean isFinished;
 
+	@GuardedBy("buffers")
+	private boolean flushRequested;
+
 	/** Flag indicating whether the subpartition has been released. */
 	private volatile boolean isReleased;
 
@@ -65,9 +69,11 @@ class PipelinedSubpartition extends ResultSubpartition {
 	@Override
 	public void flush() {
 		synchronized (buffers) {
-			if (readView != null) {
-				readView.notifyDataAvailable();
+			if (buffers.isEmpty()) {
+				return;
 			}
+			flushRequested = !buffers.isEmpty();
+			notifyDataAvailable();
 		}
 	}
 
@@ -93,7 +99,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 			if (finish) {
 				isFinished = true;
-				notifyDataAvailable();
+				flush();
 			}
 			else {
 				maybeNotifyDataAvailable();
@@ -138,17 +144,28 @@ class PipelinedSubpartition extends ResultSubpartition {
 		synchronized (buffers) {
 			Buffer buffer = null;
 
+			if (buffers.isEmpty()) {
+				flushRequested = false;
+			}
+
 			while (!buffers.isEmpty()) {
 				BufferConsumer bufferConsumer = buffers.peek();
 
 				buffer = bufferConsumer.build();
+
 				checkState(bufferConsumer.isFinished() || buffers.size() == 1,
 					"When there are multiple buffers, an unfinished bufferConsumer can not be at the head
of the buffers queue.");
 
+				if (buffers.size() == 1) {
+					// turn off flushRequested flag if we drained all of the available data
+					flushRequested = false;
+				}
+
 				if (bufferConsumer.isFinished()) {
 					buffers.pop().close();
 					decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
 				}
+
 				if (buffer.readableBytes() > 0) {
 					break;
 				}
@@ -169,7 +186,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 			// will be 2 or more.
 			return new BufferAndBacklog(
 				buffer,
-				getNumberOfFinishedBuffers() > 0,
+				isAvailableUnsafe(),
 				getBuffersInBacklog(),
 				_nextBufferIsEvent());
 		}
@@ -211,13 +228,23 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
 			if (!buffers.isEmpty()) {
-				readView.notifyDataAvailable();
+				notifyDataAvailable();
 			}
 		}
 
 		return readView;
 	}
 
+	public boolean isAvailable() {
+		synchronized (buffers) {
+			return isAvailableUnsafe();
+		}
+	}
+
+	private boolean isAvailableUnsafe() {
+		return flushRequested || getNumberOfFinishedBuffers() > 0;
+	}
+
 	// ------------------------------------------------------------------------
 
 	int getCurrentNumberOfBuffers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index c60a604..9d08358 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -81,6 +81,11 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 	}
 
 	@Override
+	public boolean isAvailable() {
+		return parent.isAvailable();
+	}
+
+	@Override
 	public Throwable getFailureCause() {
 		return parent.getFailureCause();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 41fbb0a..b1ccd63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -57,4 +57,6 @@ public interface ResultSubpartitionView {
 	 * Returns whether the next buffer is an event or not.
 	 */
 	boolean nextBufferIsEvent();
+
+	boolean isAvailable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 6ac493e..6b731d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -209,7 +209,6 @@ class SpillableSubpartition extends ResultSubpartition {
 					parent.getBufferProvider().getMemorySegmentSize(),
 					availabilityListener);
 			}
-
 			return readView;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index b821dcf..3c73e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -153,18 +153,16 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 				return null;
 			} else if (nextBuffer != null) {
 				current = nextBuffer.build();
+				checkState(nextBuffer.isFinished(),
+					"We can only read from SpillableSubpartition after it was finished");
 
-				if (nextBuffer.isFinished()) {
-					newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
-					nextBuffer.close();
-					nextBuffer = buffers.poll();
-				}
+				newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
+				nextBuffer.close();
+				nextBuffer = buffers.poll();
 
-				isMoreAvailable = buffers.size() > 0;
 				if (nextBuffer != null) {
-					isMoreAvailable = true;
-					listener.notifyDataAvailable();
 					nextBufferIsEvent = !nextBuffer.isBuffer();
+					isMoreAvailable = true;
 				}
 
 				parent.updateStatistics(current);
@@ -246,6 +244,20 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 	}
 
 	@Override
+	public boolean isAvailable() {
+		synchronized (buffers) {
+			if (nextBuffer != null) {
+				return true;
+			}
+			else if (spilledView == null) {
+				return false;
+			}
+		} // else: spilled
+
+		return spilledView.isAvailable();
+	}
+
+	@Override
 	public Throwable getFailureCause() {
 		SpilledSubpartitionView spilled = spilledView;
 		if (spilled != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 4c5cd2e..378b086 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -220,6 +220,14 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 	}
 
 	@Override
+	public synchronized boolean isAvailable() {
+		if (nextBuffer != null) {
+			return true;
+		}
+		return !fileReader.hasReachedEndOfFile();
+	}
+
+	@Override
 	public Throwable getFailureCause() {
 		return parent.getFailureCause();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index a6e9fdc..7beb18f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -38,6 +38,10 @@ public class BufferBuilderTestUtils {
 		return createFilledBufferBuilder(size, 0);
 	}
 
+	public static BufferBuilder createFilledBufferBuilder(int dataSize) {
+		return createFilledBufferBuilder(BUFFER_SIZE, dataSize);
+	}
+
 	public static BufferBuilder createFilledBufferBuilder(int size, int dataSize) {
 		checkArgument(size >= dataSize);
 		BufferBuilder bufferBuilder = new BufferBuilder(

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 56abff1..eca8263 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -220,6 +220,11 @@ public class CancelPartitionRequestTest {
 		}
 
 		@Override
+		public boolean isAvailable() {
+			return true;
+		}
+
+		@Override
 		public Throwable getFailureCause() {
 			return null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 16418ff..f614c18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -66,7 +66,7 @@ public class PartitionRequestQueueTest {
 		CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new
InputChannelID(0, 0), 10, queue);
 		CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new
InputChannelID(1, 1), 10, queue);
 
-		reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new NotReleasedResultSubpartitionView(),
new ResultPartitionID(), 0);
+		reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new EmptyAlwaysAvailableResultSubpartitionView(),
new ResultPartitionID(), 0);
 		reader1.notifyDataAvailable();
 		assertTrue(reader1.isAvailable());
 		assertFalse(reader1.isRegisteredAsAvailable());
@@ -178,6 +178,11 @@ public class PartitionRequestQueueTest {
 				buffers,
 				false);
 		}
+
+		@Override
+		public boolean isAvailable() {
+			return buffersInBacklog.get() > 0;
+		}
 	}
 
 	private static class ReadOnlyBufferResultSubpartitionView extends DefaultBufferResultSubpartitionView
{
@@ -197,14 +202,19 @@ public class PartitionRequestQueueTest {
 		}
 	}
 
-	private static class NotReleasedResultSubpartitionView extends NoOpResultSubpartitionView
{
+	private static class EmptyAlwaysAvailableResultSubpartitionView extends NoOpResultSubpartitionView
{
 		@Override
 		public boolean isReleased() {
 			return false;
 		}
+
+		@Override
+		public boolean isAvailable() {
+			return true;
+		}
 	}
 
-	private static class ReleasedResultSubpartitionView extends NoOpResultSubpartitionView {
+	private static class ReleasedResultSubpartitionView extends EmptyAlwaysAvailableResultSubpartitionView
{
 		@Override
 		public boolean isReleased() {
 			return true;
@@ -263,6 +273,11 @@ public class PartitionRequestQueueTest {
 		public boolean nextBufferIsEvent() {
 			return true;
 		}
+
+		@Override
+		public boolean isAvailable() {
+			return true;
+		}
 	}
 
 	/**
@@ -387,5 +402,10 @@ public class PartitionRequestQueueTest {
 		public boolean nextBufferIsEvent() {
 			return false;
 		}
+
+		@Override
+		public boolean isAvailable() {
+			return false;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 528f0e2..ee678ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Future;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
@@ -142,6 +143,48 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		}
 	}
 
+	/**
+	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers,
otherwise we would
+	 * busy loop on the unfinished BufferConsumers.
+	 */
+	@Test
+	public void testUnfinishedBufferBehindFinished() throws Exception {
+		final ResultSubpartition subpartition = createSubpartition();
+		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
+		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
+
+		try {
+			subpartition.add(createFilledBufferConsumer(1025)); // finished
+			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+			assertNextBuffer(readView, 1025, false, 1);
+		} finally {
+			subpartition.release();
+		}
+	}
+
+	/**
+	 * After flush call unfinished BufferConsumers should be reported as available, otherwise
we might not flush some
+	 * of the data.
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
+		final ResultSubpartition subpartition = createSubpartition();
+		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
+		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
+
+		try {
+			subpartition.add(createFilledBufferConsumer(1025)); // finished
+			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+			subpartition.flush();
+
+			assertNextBuffer(readView, 1025, true, 1);
+			assertNextBuffer(readView, 1024, false, 1);
+		} finally {
+			subpartition.release();
+		}
+	}
+
 	@Test
 	public void testMultipleEmptyBuffers() throws Exception {
 		final ResultSubpartition subpartition = createSubpartition();
@@ -188,6 +231,16 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 	}
 
 	@Test
+	public void testEmptyFlush() throws Exception {
+		final PipelinedSubpartition subpartition = createSubpartition();
+
+		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
+		subpartition.createReadView(listener);
+		subpartition.flush();
+		assertEquals(0, listener.getNumNotifications());
+	}
+
+	@Test
 	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index a6be748..e41a85c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -53,7 +53,6 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -319,7 +318,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(2, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		read.buffer().recycleBuffer();
-		assertEquals(2, listener.getNumNotifications());
+		assertTrue(read.isMoreAvailable());
+		assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true,
no need for notification
 		assertFalse(bufferConsumer.isRecycled());
 		assertFalse(read.nextBufferIsEvent());
 
@@ -332,8 +332,9 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		// only updated when getting/spilling the buffers but without the nextBuffer (kept in memory)
 		assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes());
 
-		listener.awaitNotifications(3, 30_000);
-		assertEquals(3, listener.getNumNotifications());
+		listener.awaitNotifications(2, 30_000);
+		// Spiller finished
+		assertEquals(2, listener.getNumNotifications());
 
 		assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
 		read = reader.getNextBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 1b861df..215726b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -30,6 +30,7 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -143,4 +144,8 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable());
 		assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
 	}
+
+	protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException
{
+		assertNull(readView.getNextBuffer());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
index a8251a8..a60fa3c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
@@ -53,6 +53,14 @@ public class StreamNetworkThroughputBenchmarkTests {
 	}
 
 	@Test
+	public void largeRemoteAlwaysFlush() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		env.setUp(1, 1, 0, false);
+		env.executeBenchmark(1_000_000);
+		env.tearDown();
+	}
+
+	@Test
 	public void pointToMultiPointBenchmark() throws Exception {
 		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
 		benchmark.setUp(1, 100, 100);


Mime
View raw message