flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [30/35] flink git commit: [FLINK-8591][runtime] Pass unfinished bufferConsumers to subpartitions
Date Mon, 19 Feb 2018 14:08:23 GMT
[FLINK-8591][runtime] Pass unfinished bufferConsumers to subpartitions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b1e127f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b1e127f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b1e127f

Branch: refs/heads/master
Commit: 5b1e127f7b3acd8f82893dda394fbcb7fe93d20d
Parents: 98bd689
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Wed Jan 24 14:43:23 2018 +0100
Committer: Piotr Nowojski <piotr.nowojski@gmail.com>
Committed: Mon Feb 19 12:21:41 2018 +0100

----------------------------------------------------------------------
 .../serialization/SpanningRecordSerializer.java |   3 -
 .../io/network/api/writer/RecordWriter.java     |  96 +++++++--------
 .../api/writer/ResultPartitionWriter.java       |   8 ++
 .../CreditBasedSequenceNumberingViewReader.java |  38 +++---
 .../io/network/netty/PartitionRequestQueue.java |  79 ++++++------
 .../netty/SequenceNumberingViewReader.java      |  31 ++---
 .../partition/BufferAvailabilityListener.java   |   6 +-
 .../partition/PipelinedSubpartition.java        |  84 ++++++++++---
 .../partition/PipelinedSubpartitionView.java    |   4 +-
 .../io/network/partition/ResultPartition.java   |   7 ++
 .../network/partition/ResultSubpartition.java   |  14 ++-
 .../partition/ResultSubpartitionView.java       |   2 +-
 .../partition/SpillableSubpartition.java        |  11 ++
 .../partition/SpillableSubpartitionView.java    |  14 ++-
 .../partition/SpilledSubpartitionView.java      |   8 +-
 .../partition/consumer/InputChannel.java        |   5 +-
 .../partition/consumer/LocalInputChannel.java   |  42 +++----
 .../partition/consumer/RemoteInputChannel.java  |   5 +-
 .../partition/consumer/SingleInputGate.java     |  44 +++----
 .../partition/consumer/UnionInputGate.java      |  21 ++++
 .../partition/consumer/UnknownInputChannel.java |   3 +-
 .../operators/shipping/OutputCollector.java     |  17 ++-
 ...AbstractCollectingResultPartitionWriter.java |  20 ++-
 .../io/network/api/writer/RecordWriterTest.java |  12 +-
 .../network/buffer/BufferBuilderTestUtils.java  |   4 +
 .../netty/CancelPartitionRequestTest.java       |   9 +-
 .../netty/PartitionRequestQueueTest.java        |  90 ++++++++++----
 .../netty/ServerTransportErrorHandlingTest.java |   2 +-
 .../AwaitableBufferAvailablityListener.java     |  47 +++++++
 .../NoOpBufferAvailablityListener.java          |  28 +++++
 .../PartialConsumePipelinedResultTest.java      |   2 +-
 .../partition/PipelinedSubpartitionTest.java    | 123 ++++++++++++++++---
 .../partition/SpillableSubpartitionTest.java    |  47 ++-----
 .../network/partition/SubpartitionTestBase.java |  13 ++
 .../partition/consumer/InputChannelTest.java    |   5 +-
 .../IteratorWrappingTestSingleInputGate.java    |  10 +-
 .../consumer/LocalInputChannelTest.java         |  16 +--
 .../partition/consumer/SingleInputGateTest.java |   2 +-
 .../partition/consumer/TestInputChannel.java    |  14 ++-
 .../network/util/TestSubpartitionConsumer.java  |  27 ++--
 .../flink/streaming/api/graph/StreamConfig.java |   4 +
 .../runtime/io/RecordWriterOutput.java          |   4 -
 .../runtime/io/StreamRecordWriter.java          |  11 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  13 +-
 .../consumer/StreamTestSingleInputGate.java     |  16 ++-
 .../runtime/io/StreamRecordWriterTest.java      | 113 -----------------
 46 files changed, 672 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index e1d7fb1..ba8e659 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -148,9 +148,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 	@Override
 	public void clear() {
-		if (targetBuffer != null) {
-			targetBuffer.finish();
-		}
 		targetBuffer = null;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 801e6eb..51dfbde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -61,7 +61,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	/** {@link RecordSerializer} per outgoing channel. */
 	private final RecordSerializer<T>[] serializers;
 
-	private final Optional<BufferConsumer>[] bufferConsumers;
+	private final Optional<BufferBuilder>[] bufferBuilders;
 
 	private final Random rng = new XORShiftRandom();
 
@@ -84,10 +84,10 @@ public class RecordWriter<T extends IOReadableWritable> {
 		 * serializer.
 		 */
 		this.serializers = new SpanningRecordSerializer[numChannels];
-		this.bufferConsumers = new Optional[numChannels];
+		this.bufferBuilders = new Optional[numChannels];
 		for (int i = 0; i < numChannels; i++) {
 			serializers[i] = new SpanningRecordSerializer<T>();
-			bufferConsumers[i] = Optional.empty();
+			bufferBuilders[i] = Optional.empty();
 		}
 	}
 
@@ -117,28 +117,24 @@ public class RecordWriter<T extends IOReadableWritable> {
 	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
 		RecordSerializer<T> serializer = serializers[targetChannel];
 
-		synchronized (serializer) {
-			SerializationResult result = serializer.addRecord(record);
-
-			while (result.isFullBuffer()) {
-				if (tryWriteAndClearBuffer(targetChannel, serializer)) {
-					// If this was a full record, we are done. Not breaking
-					// out of the loop at this point will lead to another
-					// buffer request before breaking out (that would not be
-					// a problem per se, but it can lead to stalls in the
-					// pipeline).
-					if (result.isFullRecord()) {
-						break;
-					}
+		SerializationResult result = serializer.addRecord(record);
+
+		while (result.isFullBuffer()) {
+			if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
+				// If this was a full record, we are done. Not breaking
+				// out of the loop at this point will lead to another
+				// buffer request before breaking out (that would not be
+				// a problem per se, but it can lead to stalls in the
+				// pipeline).
+				if (result.isFullRecord()) {
+					break;
 				}
-				BufferBuilder bufferBuilder =
-					targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-				checkState(!bufferConsumers[targetChannel].isPresent());
-				bufferConsumers[targetChannel] = Optional.of(bufferBuilder.createBufferConsumer());
-				result = serializer.setNextBufferBuilder(bufferBuilder);
 			}
-			checkState(!serializer.hasSerializedData(), "All data should be written at once");
+			BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
+
+			result = serializer.setNextBufferBuilder(bufferBuilder);
 		}
+		checkState(!serializer.hasSerializedData(), "All data should be written at once");
 	}
 
 	public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
@@ -146,34 +142,24 @@ public class RecordWriter<T extends IOReadableWritable> {
 			for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
 				RecordSerializer<T> serializer = serializers[targetChannel];
 
-				synchronized (serializer) {
-					tryWriteAndClearBuffer(targetChannel, serializer);
+				tryFinishCurrentBufferBuilder(targetChannel, serializer);
 
-					// retain the buffer so that it can be recycled by each channel of targetPartition
-					targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
-				}
+				// retain the buffer so that it can be recycled by each channel of targetPartition
+				targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
 			}
 			return eventBufferConsumer;
 		}
 	}
 
-	public void flush() throws IOException {
-		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = serializers[targetChannel];
-
-			synchronized (serializer) {
-				tryWriteAndClearBuffer(targetChannel, serializer);
-			}
-		}
+	public void flush() {
+		targetPartition.flush();
 	}
 
 	public void clearBuffers() {
 		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
 			RecordSerializer<?> serializer = serializers[targetChannel];
-			synchronized (serializer) {
-				closeBufferConsumer(targetChannel);
-				serializer.clear();
-			}
+			closeBufferConsumer(targetChannel);
+			serializer.clear();
 		}
 	}
 
@@ -185,33 +171,35 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	/**
-	 * Tries to consume serialized data and (if data present) writes them to the {@link ResultPartitionWriter}.
-	 * After writing it clean ups the state.
-	 *
-	 * <p><b>Needs to be synchronized on the serializer!</b>
+	 * Marks the current {@link BufferBuilder} as finished and clears the state for next one.
 	 *
 	 * @return true if some data were written
 	 */
-	private boolean tryWriteAndClearBuffer(
-			int targetChannel,
-			RecordSerializer<T> serializer) throws IOException {
+	private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer<T> serializer) {
 
-		if (!bufferConsumers[targetChannel].isPresent()) {
+		if (!bufferBuilders[targetChannel].isPresent()) {
 			return false;
 		}
-		BufferConsumer bufferConsumer = bufferConsumers[targetChannel].get();
-		bufferConsumers[targetChannel] = Optional.empty();
+		BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get();
+		bufferBuilders[targetChannel] = Optional.empty();
 
-		numBytesOut.inc(bufferConsumer.getWrittenBytes());
+		numBytesOut.inc(bufferBuilder.finish());
 		serializer.clear();
-		targetPartition.addBufferConsumer(bufferConsumer, targetChannel);
 		return true;
 	}
 
+	private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
+		checkState(!bufferBuilders[targetChannel].isPresent());
+		BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+		bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
+		targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
+		return bufferBuilder;
+	}
+
 	private void closeBufferConsumer(int targetChannel) {
-		if (bufferConsumers[targetChannel].isPresent()) {
-			bufferConsumers[targetChannel].get().close();
-			bufferConsumers[targetChannel] = Optional.empty();
+		if (bufferBuilders[targetChannel].isPresent()) {
+			bufferBuilders[targetChannel].get().finish();
+			bufferBuilders[targetChannel] = Optional.empty();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index caefb52..02049d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -45,6 +45,14 @@ public interface ResultPartitionWriter {
 	 *
 	 * <p>This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing
 	 * it's resources.
+	 *
+	 * <p>To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one
+	 * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}.
 	 */
 	void addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException;
+
+	/**
+	 * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers}.
+	 */
+	void flush();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
index 5ebf62d..d02b2bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the new network credit-based mode.
@@ -44,7 +44,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 
 	private final InputChannelID receiverId;
 
-	private final AtomicLong numBuffersAvailable = new AtomicLong();
+	private final AtomicBoolean buffersAvailable = new AtomicBoolean();
 
 	private final PartitionRequestQueue requestQueue;
 
@@ -118,7 +118,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 	@Override
 	public boolean isAvailable() {
 		// BEWARE: this must be in sync with #isAvailable()!
-		return numBuffersAvailable.get() > 0 &&
+		return buffersAvailable.get() &&
 			(numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent());
 	}
 
@@ -131,11 +131,9 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 	 *
 	 * @param bufferAndBacklog
 	 * 		current buffer and backlog including information about the next buffer
-	 * @param remaining
-	 * 		remaining number of queued buffers, i.e. <tt>numBuffersAvailable.get()</tt>
 	 */
-	private boolean isAvailable(BufferAndBacklog bufferAndBacklog, long remaining) {
-		return remaining > 0 &&
+	private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+		return bufferAndBacklog.isMoreAvailable() &&
 			(numCreditsAvailable > 0 || bufferAndBacklog.nextBufferIsEvent());
 	}
 
@@ -155,27 +153,23 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 	}
 
 	@VisibleForTesting
-	long getNumBuffersAvailable() {
-		return numBuffersAvailable.get();
+	boolean hasBuffersAvailable() {
+		return buffersAvailable.get();
 	}
 
 	@Override
 	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
 		BufferAndBacklog next = subpartitionView.getNextBuffer();
 		if (next != null) {
-			long remaining = numBuffersAvailable.decrementAndGet();
+			buffersAvailable.set(next.isMoreAvailable());
 			sequenceNumber++;
 
-			if (remaining < 0) {
-				throw new IllegalStateException("no buffer available");
-			}
-
 			if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
 				throw new IllegalStateException("no credit available");
 			}
 
 			return new BufferAndAvailability(
-				next.buffer(), isAvailable(next, remaining), next.buffersInBacklog());
+				next.buffer(), isAvailable(next), next.buffersInBacklog());
 		} else {
 			return null;
 		}
@@ -202,11 +196,9 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 	}
 
 	@Override
-	public void notifyBuffersAvailable(long numBuffers) {
-		// if this request made the channel non-empty, notify the input gate
-		if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
-			requestQueue.notifyReaderNonEmpty(this);
-		}
+	public void notifyDataAvailable() {
+		buffersAvailable.set(true);
+		requestQueue.notifyReaderNonEmpty(this);
 	}
 
 	@Override
@@ -214,7 +206,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen
 		return "CreditBasedSequenceNumberingViewReader{" +
 			"requestLock=" + requestLock +
 			", receiverId=" + receiverId +
-			", numBuffersAvailable=" + numBuffersAvailable.get() +
+			", buffersAvailable=" + buffersAvailable.get() +
 			", sequenceNumber=" + sequenceNumber +
 			", numCreditsAvailable=" + numCreditsAvailable +
 			", isRegisteredAsAvailable=" + isRegisteredAsAvailable +

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 4832442..8d43815 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -103,18 +103,17 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	 * availability, so there is no race condition here.
 	 */
 	private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
-		if (!reader.isRegisteredAsAvailable() && reader.isAvailable()) {
-			// Queue an available reader for consumption. If the queue is empty,
-			// we try trigger the actual write. Otherwise this will be handled by
-			// the writeAndFlushNextMessageIfPossible calls.
-			boolean triggerWrite = availableReaders.isEmpty();
-			availableReaders.add(reader);
-
-			reader.setRegisteredAsAvailable(true);
-
-			if (triggerWrite) {
-				writeAndFlushNextMessageIfPossible(ctx.channel());
-			}
+		if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
+			return;
+		}
+		// Queue an available reader for consumption. If the queue is empty,
+		// we try trigger the actual write. Otherwise this will be handled by
+		// the writeAndFlushNextMessageIfPossible calls.
+		boolean triggerWrite = availableReaders.isEmpty();
+		registerAvailableReader(reader);
+
+		if (triggerWrite) {
+			writeAndFlushNextMessageIfPossible(ctx.channel());
 		}
 	}
 
@@ -183,13 +182,12 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 			// Cancel the request for the input channel
 			int size = availableReaders.size();
 			for (int i = 0; i < size; i++) {
-				NetworkSequenceViewReader reader = availableReaders.poll();
+				NetworkSequenceViewReader reader = pollAvailableReader();
 				if (reader.getReceiverId().equals(toCancel)) {
 					reader.releaseAllResources();
-					reader.setRegisteredAsAvailable(false);
 					markAsReleased(reader.getReceiverId());
 				} else {
-					availableReaders.add(reader);
+					registerAvailableReader(reader);
 				}
 			}
 
@@ -216,7 +214,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		BufferAndAvailability next = null;
 		try {
 			while (true) {
-				NetworkSequenceViewReader reader = availableReaders.poll();
+				NetworkSequenceViewReader reader = pollAvailableReader();
 
 				// No queue with available data. We allow this here, because
 				// of the write callbacks that are executed after each write.
@@ -226,32 +224,24 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 				next = reader.getNextBuffer();
 				if (next == null) {
-					if (reader.isReleased()) {
-						markAsReleased(reader.getReceiverId());
+					if (!reader.isReleased()) {
+						continue;
+					}
+					markAsReleased(reader.getReceiverId());
 
-						Throwable cause = reader.getFailureCause();
-						if (cause != null) {
-							ErrorResponse msg = new ErrorResponse(
-								new ProducerFailedException(cause),
-								reader.getReceiverId());
-
-							ctx.writeAndFlush(msg);
-						}
-					} else {
-						IllegalStateException err = new IllegalStateException(
-							"Bug in Netty consumer logic: reader queue got notified by partition " +
-								"about available data, but none was available.");
-						handleException(ctx.channel(), err);
-						return;
+					Throwable cause = reader.getFailureCause();
+					if (cause != null) {
+						ErrorResponse msg = new ErrorResponse(
+							new ProducerFailedException(cause),
+							reader.getReceiverId());
+
+						ctx.writeAndFlush(msg);
 					}
 				} else {
 					// This channel was now removed from the available reader queue.
-					// We re-add it into the queue if it is still available, otherwise we will
-					// notify the reader about the changed channel availability registration.
+					// We re-add it into the queue if it is still available
 					if (next.moreAvailable()) {
-						availableReaders.add(reader);
-					} else {
-						reader.setRegisteredAsAvailable(false);
+						registerAvailableReader(reader);
 					}
 
 					BufferResponse msg = new BufferResponse(
@@ -283,6 +273,19 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		}
 	}
 
+	private void registerAvailableReader(NetworkSequenceViewReader reader) {
+		availableReaders.add(reader);
+		reader.setRegisteredAsAvailable(true);
+	}
+
+	private NetworkSequenceViewReader pollAvailableReader() {
+		NetworkSequenceViewReader reader = availableReaders.poll();
+		if (reader != null) {
+			reader.setRegisteredAsAvailable(false);
+		}
+		return reader;
+	}
+
 	private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
 		return EventSerializer.isEvent(buffer, EndOfPartitionEvent.class,
 			getClass().getClassLoader());
@@ -301,7 +304,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	}
 
 	private void handleException(Channel channel, Throwable cause) throws IOException {
-		LOG.debug("Encountered error while consuming partitions", cause);
+		LOG.error("Encountered error while consuming partitions", cause);
 
 		fatalError = true;
 		releaseAllResources();

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 0ec5fcb..2d9635c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -19,17 +19,17 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the old network mode.
@@ -43,7 +43,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network
 
 	private final InputChannelID receiverId;
 
-	private final AtomicLong numBuffersAvailable = new AtomicLong();
+	private final AtomicBoolean buffersAvailable = new AtomicBoolean();
 
 	private final PartitionRequestQueue requestQueue;
 
@@ -51,6 +51,8 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network
 
 	private int sequenceNumber = -1;
 
+	private boolean isRegisteredAvailable;
+
 	SequenceNumberingViewReader(InputChannelID receiverId, PartitionRequestQueue requestQueue) {
 		this.receiverId = receiverId;
 		this.requestQueue = requestQueue;
@@ -84,16 +86,17 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network
 
 	@Override
 	public void setRegisteredAsAvailable(boolean isRegisteredAvailable) {
+		this.isRegisteredAvailable = isRegisteredAvailable;
 	}
 
 	@Override
 	public boolean isRegisteredAsAvailable() {
-		return false;
+		return isRegisteredAvailable;
 	}
 
 	@Override
 	public boolean isAvailable() {
-		return true;
+		return buffersAvailable.get();
 	}
 
 	@Override
@@ -110,14 +113,9 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network
 	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
 		BufferAndBacklog next = subpartitionView.getNextBuffer();
 		if (next != null) {
-			long remaining = numBuffersAvailable.decrementAndGet();
+			buffersAvailable.set(next.isMoreAvailable());
 			sequenceNumber++;
-
-			if (remaining >= 0) {
-				return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog());
-			} else {
-				throw new IllegalStateException("no buffer available");
-			}
+			return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog());
 		} else {
 			return null;
 		}
@@ -144,11 +142,9 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network
 	}
 
 	@Override
-	public void notifyBuffersAvailable(long numBuffers) {
-		// if this request made the channel non-empty, notify the input gate
-		if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
-			requestQueue.notifyReaderNonEmpty(this);
-		}
+	public void notifyDataAvailable() {
+		buffersAvailable.set(true);
+		requestQueue.notifyReaderNonEmpty(this);
 	}
 
 	@Override
@@ -156,7 +152,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network
 		return "SequenceNumberingViewReader{" +
 			"requestLock=" + requestLock +
 			", receiverId=" + receiverId +
-			", numBuffersAvailable=" + numBuffersAvailable.get() +
 			", sequenceNumber=" + sequenceNumber +
 			'}';
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
index 114ef7c..e78f99a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
@@ -25,9 +25,7 @@ package org.apache.flink.runtime.io.network.partition;
 public interface BufferAvailabilityListener {
 
 	/**
-	 * Called whenever a new number of buffers becomes available.
-	 *
-	 * @param numBuffers The number of buffers that became available.
+	 * Called whenever there might be new data available.
 	 */
-	void notifyBuffersAvailable(long numBuffers);
+	void notifyDataAvailable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2fa512a..dcaa360 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -63,6 +63,15 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
+	public void flush() {
+		synchronized (buffers) {
+			if (readView != null) {
+				readView.notifyDataAvailable();
+			}
+		}
+	}
+
+	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
 		LOG.debug("Finished {}.", this);
@@ -84,10 +93,10 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 			if (finish) {
 				isFinished = true;
+				notifyDataAvailable();
 			}
-
-			if (readView != null) {
-				readView.notifyBuffersAvailable(1);
+			else {
+				maybeNotifyDataAvailable();
 			}
 		}
 
@@ -127,19 +136,42 @@ class PipelinedSubpartition extends ResultSubpartition {
 	@Nullable
 	BufferAndBacklog pollBuffer() {
 		synchronized (buffers) {
-			BufferConsumer bufferConsumer = buffers.peek();
-			if (bufferConsumer == null) {
-				return null;
+			Buffer buffer = null;
+
+			while (!buffers.isEmpty()) {
+				BufferConsumer bufferConsumer = buffers.peek();
+
+				buffer = bufferConsumer.build();
+				checkState(bufferConsumer.isFinished() || buffers.size() == 1,
+					"When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");
+
+				if (bufferConsumer.isFinished()) {
+					buffers.pop().close();
+					decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
+				}
+				if (buffer.readableBytes() > 0) {
+					break;
+				}
+				buffer.recycleBuffer();
+				buffer = null;
+				if (!bufferConsumer.isFinished()) {
+					break;
+				}
 			}
 
-			Buffer buffer = bufferConsumer.build();
-			if (bufferConsumer.isFinished()) {
-				buffers.pop().close();
-				decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
+			if (buffer == null) {
+				return null;
 			}
 
 			updateStatistics(buffer);
-			return new BufferAndBacklog(buffer, getBuffersInBacklog(), _nextBufferIsEvent());
+			// Do not report last remaining buffer on buffers as available to read (assuming it's unfinished).
+			// It will be reported for reading either on flush or when the number of buffers in the queue
+			// will be 2 or more.
+			return new BufferAndBacklog(
+				buffer,
+				getNumberOfFinishedBuffers() > 0,
+				getBuffersInBacklog(),
+				_nextBufferIsEvent());
 		}
 	}
 
@@ -169,8 +201,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
-		final int queueSize;
-
 		synchronized (buffers) {
 			checkState(!isReleased);
 			checkState(readView == null,
@@ -179,12 +209,12 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 			LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
 
-			queueSize = buffers.size();
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
+			if (!buffers.isEmpty()) {
+				readView.notifyDataAvailable();
+			}
 		}
 
-		readView.notifyBuffersAvailable(queueSize);
-
 		return readView;
 	}
 
@@ -220,4 +250,26 @@ class PipelinedSubpartition extends ResultSubpartition {
 		// since we do not synchronize, the size may actually be lower than 0!
 		return Math.max(buffers.size(), 0);
 	}
+
+	private void maybeNotifyDataAvailable() {
+		// Notify only when we added first finished buffer.
+		if (getNumberOfFinishedBuffers() == 1) {
+			notifyDataAvailable();
+		}
+	}
+
+	private void notifyDataAvailable() {
+		if (readView != null) {
+			readView.notifyDataAvailable();
+		}
+	}
+
+	private int getNumberOfFinishedBuffers() {
+		if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
+			return 1;
+		}
+
+		// We assume that only last buffer is not finished.
+		return Math.max(0, buffers.size() - 1);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 21abd04..c60a604 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -52,8 +52,8 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 	}
 
 	@Override
-	public void notifyBuffersAvailable(long numBuffers) {
-		availabilityListener.notifyBuffersAvailable(numBuffers);
+	public void notifyDataAvailable() {
+		availabilityListener.notifyDataAvailable();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 9be261e..25a076b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -257,6 +257,13 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		}
 	}
 
+	@Override
+	public void flush() {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
+	}
+
 	/**
 	 * Finishes the result partition.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 572cde7..adc0ed3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -93,6 +93,9 @@ public abstract class ResultSubpartition {
 	 * <p>The request may be executed synchronously, or asynchronously, depending on the
 	 * implementation.
 	 *
+	 * <p><strong>IMPORTANT:</strong> Before adding new {@link BufferConsumer} previously added must be in finished
+	 * state. Because of the performance reasons, this is only enforced during the data reading.
+	 *
 	 * @param bufferConsumer
 	 * 		the buffer to add (transferring ownership to this writer)
 	 * @return true if operation succeeded and bufferConsumer was enqueued for consumption.
@@ -101,6 +104,8 @@ public abstract class ResultSubpartition {
 	 */
 	abstract public boolean add(BufferConsumer bufferConsumer) throws IOException;
 
+	abstract public void flush();
+
 	abstract public void finish() throws IOException;
 
 	abstract public void release() throws IOException;
@@ -170,12 +175,14 @@ public abstract class ResultSubpartition {
 	public static final class BufferAndBacklog {
 
 		private final Buffer buffer;
+		private final boolean isMoreAvailable;
 		private final int buffersInBacklog;
 		private final boolean nextBufferIsEvent;
 
-		public BufferAndBacklog(Buffer buffer, int buffersInBacklog, boolean nextBufferIsEvent) {
+		public BufferAndBacklog(Buffer buffer, boolean isMoreAvailable, int buffersInBacklog, boolean nextBufferIsEvent) {
 			this.buffer = checkNotNull(buffer);
 			this.buffersInBacklog = buffersInBacklog;
+			this.isMoreAvailable = isMoreAvailable;
 			this.nextBufferIsEvent = nextBufferIsEvent;
 		}
 
@@ -183,10 +190,15 @@ public abstract class ResultSubpartition {
 			return buffer;
 		}
 
+		public boolean isMoreAvailable() {
+			return isMoreAvailable;
+		}
+
 		public int buffersInBacklog() {
 			return buffersInBacklog;
 		}
 
+
 		public boolean nextBufferIsEvent() {
 			return nextBufferIsEvent;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 9b0344e..41fbb0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -43,7 +43,7 @@ public interface ResultSubpartitionView {
 	@Nullable
 	BufferAndBacklog getNextBuffer() throws IOException, InterruptedException;
 
-	void notifyBuffersAvailable(long buffers);
+	void notifyDataAvailable();
 
 	void releaseAllResources() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4b9f59f..8758b34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -115,11 +115,22 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
+	public void flush() {
+		synchronized (buffers) {
+			if (readView != null) {
+				readView.notifyDataAvailable();
+			}
+		}
+	}
+
+	@Override
 	public synchronized void finish() throws IOException {
 		synchronized (buffers) {
 			if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE))) {
 				isFinished = true;
 			}
+
+			flush();
 		}
 
 		// If we are spilling/have spilled, wait for the writer to finish

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 6c173a3..789b3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -89,7 +89,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 		}
 
 		if (nextBuffer != null) {
-			listener.notifyBuffersAvailable(1);
+			listener.notifyDataAvailable();
 		}
 	}
 
@@ -143,20 +143,24 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 		Buffer current = null;
 		boolean nextBufferIsEvent = false;
 		int newBacklog = 0; // this is always correct if current is non-null!
+		boolean isMoreAvailable = false;
 
 		synchronized (buffers) {
 			if (isReleased.get()) {
 				return null;
 			} else if (nextBuffer != null) {
 				current = nextBuffer.build();
+
 				if (nextBuffer.isFinished()) {
 					newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
 					nextBuffer.close();
 					nextBuffer = buffers.poll();
 				}
 
+				isMoreAvailable = buffers.size() > 0;
 				if (nextBuffer != null) {
-					listener.notifyBuffersAvailable(1);
+					isMoreAvailable = true;
+					listener.notifyDataAvailable();
 					nextBufferIsEvent = !nextBuffer.isBuffer();
 				}
 
@@ -164,7 +168,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 				// if we are spilled (but still process a non-spilled nextBuffer), we don't know the
 				// state of nextBufferIsEvent...
 				if (spilledView == null) {
-					return new BufferAndBacklog(current, newBacklog, nextBufferIsEvent);
+					return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent);
 				}
 			}
 		} // else: spilled
@@ -172,7 +176,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 		SpilledSubpartitionView spilled = spilledView;
 		if (spilled != null) {
 			if (current != null) {
-				return new BufferAndBacklog(current, newBacklog, spilled.nextBufferIsEvent());
+				return new BufferAndBacklog(current, isMoreAvailable, newBacklog, spilled.nextBufferIsEvent());
 			} else {
 				return spilled.getNextBuffer();
 			}
@@ -182,7 +186,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 	}
 
 	@Override
-	public void notifyBuffersAvailable(long buffers) {
+	public void notifyDataAvailable() {
 		// We do the availability listener notification one by one
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index d1917e6..4c5cd2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -105,7 +105,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 		// Otherwise, we notify only when the spill writer callback happens.
 		if (!spillWriter.registerAllRequestsProcessedListener(this)) {
 			isSpillInProgress = false;
-			availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+			availabilityListener.notifyDataAvailable();
 			LOG.debug("No spilling in progress. Notified about {} available buffers.", numberOfSpilledBuffers);
 		} else {
 			LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", numberOfSpilledBuffers);
@@ -120,7 +120,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 	@Override
 	public void onNotification() {
 		isSpillInProgress = false;
-		availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+		availabilityListener.notifyDataAvailable();
 		LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers);
 	}
 
@@ -148,7 +148,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 		}
 
 		int newBacklog = parent.decreaseBuffersInBacklog(current);
-		return new BufferAndBacklog(current, newBacklog, nextBufferIsEvent);
+		return new BufferAndBacklog(current, newBacklog > 0, newBacklog, nextBufferIsEvent);
 	}
 
 	@Nullable
@@ -166,7 +166,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 	}
 
 	@Override
-	public void notifyBuffersAvailable(long buffers) {
+	public void notifyDataAvailable() {
 		// We do the availability listener notification either directly on
 		// construction of this view (when everything has been spilled) or
 		// as soon as spilling is done and we are notified about it in the

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 7b7edf7..3ce5866 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -134,9 +135,9 @@ public abstract class InputChannel {
 	abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
 
 	/**
-	 * Returns the next buffer from the consumed subpartition.
+	 * Returns the next buffer from the consumed subpartition or {@code Optional.empty()} if there is no data to return.
 	 */
-	abstract BufferAndAvailability getNextBuffer() throws IOException, InterruptedException;
+	abstract Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException;
 
 	// ------------------------------------------------------------------------
 	// Task events

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 8505666..f9c75ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -23,19 +23,19 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
-import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -57,9 +57,6 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	/** Task event dispatcher for backwards events. */
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	/** Number of available buffers used to keep track of non-empty gate notifications. */
-	private final AtomicLong numBuffersAvailable;
-
 	/** The consumed subpartition */
 	private volatile ResultSubpartitionView subpartitionView;
 
@@ -91,7 +88,6 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
-		this.numBuffersAvailable = new AtomicLong();
 	}
 
 	// ------------------------------------------------------------------------
@@ -166,11 +162,19 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	}
 
 	@Override
-	BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
+	Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
 		checkError();
 
 		ResultSubpartitionView subpartitionView = this.subpartitionView;
 		if (subpartitionView == null) {
+			// There is a possible race condition between writing a EndOfPartitionEvent (1) and flushing (3) the Local
+			// channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush notification (4). When
+			// they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue LocalInputChannel after (or
+			// during) it was released during reading the EndOfPartitionEvent (2).
+			if (isReleased) {
+				return Optional.empty();
+			}
+
 			// this can happen if the request for the partition was triggered asynchronously
 			// by the time trigger
 			// would be good to avoid that, by guaranteeing that the requestPartition() and
@@ -185,31 +189,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 			if (subpartitionView.isReleased()) {
 				throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
 			} else {
-				// This means there is a bug in the buffer availability
-				// notifications.
-				throw new IllegalStateException("Consumed partition has no buffers available. " +
-					"Number of received buffer notifications is " + numBuffersAvailable + ".");
+				return Optional.empty();
 			}
 		}
 
-		long remaining = numBuffersAvailable.decrementAndGet();
-
-		if (remaining >= 0) {
-			numBytesIn.inc(next.buffer().getSizeUnsafe());
-			return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog());
-		} else if (subpartitionView.isReleased()) {
-			throw new ProducerFailedException(subpartitionView.getFailureCause());
-		} else {
-			throw new IllegalStateException("No buffer available and producer partition not released.");
-		}
+		numBytesIn.inc(next.buffer().getSizeUnsafe());
+		return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
 	}
 
 	@Override
-	public void notifyBuffersAvailable(long numBuffers) {
-		// if this request made the channel non-empty, notify the input gate
-		if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
-			notifyChannelNonEmpty();
-		}
+	public void notifyDataAvailable() {
+		notifyChannelNonEmpty();
 	}
 
 	private ResultSubpartitionView checkAndWaitForSubpartitionView() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 8a8c7f5..8174359 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -42,6 +42,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -183,7 +184,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 	}
 
 	@Override
-	BufferAndAvailability getNextBuffer() throws IOException {
+	Optional<BufferAndAvailability> getNextBuffer() throws IOException {
 		checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
 		checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
 
@@ -198,7 +199,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		}
 
 		numBytesIn.inc(next.getSizeUnsafe());
-		return new BufferAndAvailability(next, remaining > 0, getSenderBacklog());
+		return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 337b3c2..04b8ee6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -509,39 +509,39 @@ public class SingleInputGate implements InputGate {
 
 		InputChannel currentChannel;
 		boolean moreAvailable;
-		synchronized (inputChannelsWithData) {
-			while (inputChannelsWithData.size() == 0) {
-				if (isReleased) {
-					throw new IllegalStateException("Released");
-				}
+		Optional<BufferAndAvailability> result = Optional.empty();
 
-				if (blocking) {
-					inputChannelsWithData.wait();
-				}
-				else {
-					return Optional.empty();
+		do {
+			synchronized (inputChannelsWithData) {
+				while (inputChannelsWithData.size() == 0) {
+					if (isReleased) {
+						throw new IllegalStateException("Released");
+					}
+
+					if (blocking) {
+						inputChannelsWithData.wait();
+					}
+					else {
+						return Optional.empty();
+					}
 				}
+
+				currentChannel = inputChannelsWithData.remove();
+				enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+				moreAvailable = inputChannelsWithData.size() > 0;
 			}
 
-			currentChannel = inputChannelsWithData.remove();
-			enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
-			moreAvailable = inputChannelsWithData.size() > 0;
-		}
+			result = currentChannel.getNextBuffer();
+		} while (!result.isPresent());
 
-		final BufferAndAvailability result = currentChannel.getNextBuffer();
-		// Sanity check that notifications only happen when data is available
-		if (result == null) {
-			throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
-					"notified by channel about available data, but none was available.");
-		}
 		// this channel was now removed from the non-empty channels queue
 		// we re-add it in case it has more data, because in that case no "non-empty" notification
 		// will come for that channel
-		if (result.moreAvailable()) {
+		if (result.get().moreAvailable()) {
 			queueChannel(currentChannel);
 		}
 
-		final Buffer buffer = result.buffer();
+		final Buffer buffer = result.get().buffer();
 		if (buffer.isBuffer()) {
 			return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 14c04bc..5a547ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -26,12 +26,14 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Input gate wrapper to union the input from multiple input gates.
@@ -71,6 +73,11 @@ public class UnionInputGate implements InputGate, InputGateListener {
 	/** Gates, which notified this input gate about available data. */
 	private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();
 
+	/**
+	 * Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}.
+	 */
+	private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>();
+
 	/** The total number of input channels across all unioned input gates. */
 	private final int totalNumberOfInputChannels;
 
@@ -163,12 +170,20 @@ public class UnionInputGate implements InputGate, InputGateListener {
 			&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
 			&& inputGate.isFinished()) {
 
+			checkState(!bufferOrEvent.moreAvailable());
 			if (!inputGatesWithRemainingData.remove(inputGate)) {
 				throw new IllegalStateException("Couldn't find input gate in set of remaining " +
 					"input gates.");
 			}
 		}
 
+		if (bufferOrEvent.moreAvailable()) {
+			// this buffer or event was now removed from the non-empty gates queue
+			// we re-add it in case it has more data, because in that case no "non-empty" notification
+			// will come for that gate
+			queueInputGate(inputGate);
+		}
+
 		// Set the channel index to identify the input channel (across all unioned input gates)
 		final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
 
@@ -190,6 +205,7 @@ public class UnionInputGate implements InputGate, InputGateListener {
 					inputGatesWithData.wait();
 				}
 				inputGate = inputGatesWithData.remove();
+				enqueuedInputGatesWithData.remove(inputGate);
 			}
 
 			// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
@@ -248,9 +264,14 @@ public class UnionInputGate implements InputGate, InputGateListener {
 		int availableInputGates;
 
 		synchronized (inputGatesWithData) {
+			if (enqueuedInputGatesWithData.contains(inputGate)) {
+				return;
+			}
+
 			availableInputGates = inputGatesWithData.size();
 
 			inputGatesWithData.add(inputGate);
+			enqueuedInputGatesWithData.add(inputGate);
 
 			if (availableInputGates == 0) {
 				inputGatesWithData.notifyAll();

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index d887ab6..1101f66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -76,7 +77,7 @@ class UnknownInputChannel extends InputChannel {
 	}
 
 	@Override
-	public BufferAndAvailability getNextBuffer() throws IOException {
+	public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
 		// Nothing to do here
 		throw new UnsupportedOperationException("Cannot retrieve a buffer from an UnknownInputChannel");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index 3526e96..382ae39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.runtime.operators.shipping;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * The OutputCollector collects records, and emits them to the  {@link RecordWriter}s.
  * The OutputCollector tracks to which writers a deep-copy must be given and which not.
@@ -81,11 +81,8 @@ public class OutputCollector<T> implements Collector<T> {
 	@Override
 	public void close() {
 		for (RecordWriter<?> writer : writers) {
-			try {
-				writer.flush();
-			} catch (IOException e) {
-				throw new RuntimeException(e.getMessage(), e);
-			}
+			writer.clearBuffers();
+			writer.flush();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 5a7d20a..b2171c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -32,6 +34,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * {@link ResultPartitionWriter} that collects output on the List.
  */
+@ThreadSafe
 public abstract class AbstractCollectingResultPartitionWriter implements ResultPartitionWriter {
 	private final BufferProvider bufferProvider;
 	private final ArrayDeque<BufferConsumer> bufferConsumers = new ArrayDeque<>();
@@ -61,13 +64,15 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
 	}
 
 	@Override
-	public void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
+	public synchronized void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
 		checkState(targetChannel < getNumberOfSubpartitions());
-
 		bufferConsumers.add(bufferConsumer);
+		processBufferConsumers();
+	}
 
+	private void processBufferConsumers() throws IOException {
 		while (!bufferConsumers.isEmpty()) {
-			bufferConsumer = bufferConsumers.peek();
+			BufferConsumer bufferConsumer = bufferConsumers.peek();
 			Buffer buffer = bufferConsumer.build();
 			try {
 				deserializeBuffer(buffer);
@@ -82,5 +87,14 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
 		}
 	}
 
+	@Override
+	public synchronized void flush() {
+		try {
+			processBufferConsumers();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
 	protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 95d6655..ed32454 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -62,10 +62,7 @@ import java.util.concurrent.Future;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -180,7 +177,6 @@ public class RecordWriterTest {
 
 		// Fill a buffer, but don't write it out.
 		recordWriter.emit(new IntValue(0));
-		verify(partitionWriter, never()).addBufferConsumer(any(BufferConsumer.class), anyInt());
 
 		// Clear all buffers.
 		recordWriter.clearBuffers();
@@ -428,6 +424,10 @@ public class RecordWriterTest {
 		public void addBufferConsumer(BufferConsumer buffer, int targetChannel) throws IOException {
 			queues[targetChannel].add(buffer);
 		}
+
+		@Override
+		public void flush() {
+		}
 	}
 
 	private static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
@@ -477,6 +477,10 @@ public class RecordWriterTest {
 		public void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
 			bufferConsumer.close();
 		}
+
+		@Override
+		public void flush() {
+		}
 	}
 
 	private static class ByteArrayIO implements IOReadableWritable {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index c6b8599..ead42df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -30,6 +30,10 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 public class BufferBuilderTestUtils {
 	public static final int BUFFER_SIZE = 32 * 1024;
 
+	public static BufferBuilder createBufferBuilder() {
+		return createBufferBuilder(BUFFER_SIZE);
+	}
+
 	public static BufferBuilder createBufferBuilder(int size) {
 		return createFilledBufferBuilder(size, 0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 4c4939b..56abff1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
@@ -88,7 +87,7 @@ public class CancelPartitionRequestTest {
 					@Override
 					public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
 						BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
-						listener.notifyBuffersAvailable(Long.MAX_VALUE);
+						listener.notifyDataAvailable();
 						return view;
 					}
 				});
@@ -139,7 +138,7 @@ public class CancelPartitionRequestTest {
 						@Override
 						public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
 							BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
-							listener.notifyBuffersAvailable(Long.MAX_VALUE);
+							listener.notifyDataAvailable();
 							return view;
 						}
 					});
@@ -194,11 +193,11 @@ public class CancelPartitionRequestTest {
 		public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
 			Buffer buffer = bufferProvider.requestBufferBlocking();
 			buffer.setSize(buffer.getMaxCapacity()); // fake some data
-			return new BufferAndBacklog(buffer, 0, false);
+			return new BufferAndBacklog(buffer, true, 0, false);
 		}
 
 		@Override
-		public void notifyBuffersAvailable(long buffers) {
+		public void notifyDataAvailable() {
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 69a0e11..16418ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
@@ -52,6 +53,43 @@ import static org.junit.Assert.assertTrue;
  */
 public class PartitionRequestQueueTest {
 
+	/**
+	 * In case of enqueuing an empty reader and a reader that actually has some buffers when channel is not writable,
+	 * on channelWritability change event should result in reading all of the messages.
+	 */
+	@Test
+	public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
+		final int buffersToWrite = 5;
+		PartitionRequestQueue queue = new PartitionRequestQueue();
+		EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+		CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0, 0), 10, queue);
+		CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1, 1), 10, queue);
+
+		reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new NotReleasedResultSubpartitionView(), new ResultPartitionID(), 0);
+		reader1.notifyDataAvailable();
+		assertTrue(reader1.isAvailable());
+		assertFalse(reader1.isRegisteredAsAvailable());
+
+		channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false);
+		assertFalse(channel.isWritable());
+
+		reader1.notifyDataAvailable();
+		channel.runPendingTasks();
+
+		reader2.notifyDataAvailable();
+		reader2.requestSubpartitionView((partitionId, index, availabilityListener) -> new DefaultBufferResultSubpartitionView(buffersToWrite), new ResultPartitionID(), 0);
+		assertTrue(reader2.isAvailable());
+		assertFalse(reader2.isRegisteredAsAvailable());
+
+		reader2.notifyDataAvailable();
+
+		// changing a channel writability should result in draining both reader1 and reader2
+		channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true);
+		channel.runPendingTasks();
+		assertEquals(buffersToWrite, channel.outboundMessages().size());
+	}
+
 	@Test
 	public void testProducerFailedException() throws Exception {
 		PartitionRequestQueue queue = new PartitionRequestQueue();
@@ -66,7 +104,7 @@ public class PartitionRequestQueueTest {
 		CreditBasedSequenceNumberingViewReader seqView = new CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue);
 		seqView.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
 		// Add available buffer to trigger enqueue the erroneous view
-		seqView.notifyBuffersAvailable(1);
+		seqView.notifyDataAvailable();
 
 		ch.runPendingTasks();
 
@@ -84,7 +122,7 @@ public class PartitionRequestQueueTest {
 	 */
 	@Test
 	public void testDefaultBufferWriting() throws Exception {
-		testBufferWriting(new DefaultBufferResultSubpartitionView(2));
+		testBufferWriting(new DefaultBufferResultSubpartitionView(1));
 	}
 
 	/**
@@ -92,7 +130,7 @@ public class PartitionRequestQueueTest {
 	 */
 	@Test
 	public void testReadOnlyBufferWriting() throws Exception {
-		testBufferWriting(new ReadOnlyBufferResultSubpartitionView(2));
+		testBufferWriting(new ReadOnlyBufferResultSubpartitionView(1));
 	}
 
 	private void testBufferWriting(ResultSubpartitionView view) throws IOException {
@@ -108,7 +146,7 @@ public class PartitionRequestQueueTest {
 		reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
 
 		// notify about buffer availability and encode one buffer
-		reader.notifyBuffersAvailable(1);
+		reader.notifyDataAvailable();
 
 		channel.runPendingTasks();
 
@@ -124,37 +162,45 @@ public class PartitionRequestQueueTest {
 
 	private static class DefaultBufferResultSubpartitionView extends NoOpResultSubpartitionView {
 		/** Number of buffer in the backlog to report with every {@link #getNextBuffer()} call. */
-		final int buffersInBacklog;
+		private final AtomicInteger buffersInBacklog;
 
 		private DefaultBufferResultSubpartitionView(int buffersInBacklog) {
-			this.buffersInBacklog = buffersInBacklog;
+			this.buffersInBacklog = new AtomicInteger(buffersInBacklog);;
 		}
 
 		@Nullable
 		@Override
 		public BufferAndBacklog getNextBuffer() {
+			int buffers = buffersInBacklog.decrementAndGet();
 			return new BufferAndBacklog(
 				TestBufferFactory.createBuffer(10),
-				buffersInBacklog,
+				buffers > 0,
+				buffers,
 				false);
 		}
 	}
 
-	private static class ReadOnlyBufferResultSubpartitionView extends NoOpResultSubpartitionView {
-		/** Number of buffer in the backlog to report with every {@link #getNextBuffer()} call. */
-		final int buffersInBacklog;
-
+	private static class ReadOnlyBufferResultSubpartitionView extends DefaultBufferResultSubpartitionView {
 		private ReadOnlyBufferResultSubpartitionView(int buffersInBacklog) {
-			this.buffersInBacklog = buffersInBacklog;
+			super(buffersInBacklog);
 		}
 
 		@Nullable
 		@Override
 		public BufferAndBacklog getNextBuffer() {
+			BufferAndBacklog nextBuffer = super.getNextBuffer();
 			return new BufferAndBacklog(
-				TestBufferFactory.createBuffer(10).readOnlySlice(),
-				buffersInBacklog,
-				false);
+				nextBuffer.buffer().readOnlySlice(),
+				nextBuffer.isMoreAvailable(),
+				nextBuffer.buffersInBacklog(),
+				nextBuffer.nextBufferIsEvent());
+		}
+	}
+
+	private static class NotReleasedResultSubpartitionView extends NoOpResultSubpartitionView {
+		@Override
+		public boolean isReleased() {
+			return false;
 		}
 	}
 
@@ -195,7 +241,7 @@ public class PartitionRequestQueueTest {
 		assertNull(channel.readOutbound());
 
 		// Notify an available event buffer to trigger enqueue the reader
-		reader.notifyBuffersAvailable(1);
+		reader.notifyDataAvailable();
 
 		channel.runPendingTasks();
 
@@ -226,7 +272,7 @@ public class PartitionRequestQueueTest {
 	@Test
 	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
 		// setup
-		final ResultSubpartitionView view = new DefaultBufferResultSubpartitionView(2);
+		final ResultSubpartitionView view = new DefaultBufferResultSubpartitionView(10);
 
 		ResultPartitionProvider partitionProvider =
 			(partitionId, index, availabilityListener) -> view;
@@ -246,7 +292,7 @@ public class PartitionRequestQueueTest {
 		// Notify available buffers to trigger enqueue the reader
 		final int notifyNumBuffers = 5;
 		for (int i = 0; i < notifyNumBuffers; i++) {
-			reader.notifyBuffersAvailable(1);
+			reader.notifyDataAvailable();
 		}
 
 		channel.runPendingTasks();
@@ -254,7 +300,7 @@ public class PartitionRequestQueueTest {
 		// the reader is not enqueued in the pipeline because no credits are available
 		// -> it should still have the same number of pending buffers
 		assertEquals(0, queue.getAvailableReaders().size());
-		assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
+		assertTrue(reader.hasBuffersAvailable());
 		assertFalse(reader.isRegisteredAsAvailable());
 		assertEquals(0, reader.getNumCreditsAvailable());
 
@@ -269,7 +315,7 @@ public class PartitionRequestQueueTest {
 			assertTrue(reader.isRegisteredAsAvailable());
 			assertThat(queue.getAvailableReaders(), contains(reader)); // contains only (this) one!
 			assertEquals(i, reader.getNumCreditsAvailable());
-			assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
+			assertTrue(reader.hasBuffersAvailable());
 		}
 
 		// Flush the buffer to make the channel writable again and see the final results
@@ -278,7 +324,7 @@ public class PartitionRequestQueueTest {
 
 		assertEquals(0, queue.getAvailableReaders().size());
 		assertEquals(0, reader.getNumCreditsAvailable());
-		assertEquals(notifyNumBuffers - notifyNumCredits, reader.getNumBuffersAvailable());
+		assertTrue(reader.hasBuffersAvailable());
 		assertFalse(reader.isRegisteredAsAvailable());
 		for (int i = 1; i <= notifyNumCredits; i++) {
 			assertThat(channel.readOutbound(), instanceOf(NettyMessage.BufferResponse.class));
@@ -316,7 +362,7 @@ public class PartitionRequestQueueTest {
 		}
 
 		@Override
-		public void notifyBuffersAvailable(long buffers) {
+		public void notifyDataAvailable() {
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 8646168..5360041 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -68,7 +68,7 @@ public class ServerTransportErrorHandlingTest {
 				@Override
 				public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
 					BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
-					listener.notifyBuffersAvailable(Long.MAX_VALUE);
+					listener.notifyDataAvailable();
 					return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
new file mode 100644
index 0000000..2b6b834
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Test implementation of {@link BufferAvailabilityListener}.
+ */
+class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
+
+	private long numNotifications;
+
+	@Override
+	public void notifyDataAvailable() {
+		++numNotifications;
+	}
+
+	public long getNumNotifications() {
+		return numNotifications;
+	}
+
+	public void resetNotificationCounters() {
+		numNotifications = 0;
+	}
+
+	void awaitNotifications(long awaitedNumNotifications, long timeoutMillis) throws InterruptedException {
+		long deadline = System.currentTimeMillis() + timeoutMillis;
+		while (numNotifications < awaitedNumNotifications && System.currentTimeMillis() < deadline) {
+			Thread.sleep(1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
new file mode 100644
index 0000000..4162975
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Test implementation of {@link BufferAvailabilityListener}.
+ */
+class NoOpBufferAvailablityListener implements BufferAvailabilityListener {
+	@Override
+	public void notifyDataAvailable() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 76e6f2c..ced1a33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -123,8 +123,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 			for (int i = 0; i < 8; i++) {
 				final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
 				writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
-
 				Thread.sleep(50);
+				bufferBuilder.finish();
 			}
 		}
 	}


Mime
View raw message