flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/3] flink git commit: [FLINK-1755] Fix possible NullPointerException in LocalInputChannel
Date Thu, 26 Mar 2015 10:51:54 GMT
Repository: flink
Updated Branches:
  refs/heads/master b13c9a7dd -> 0b86903dc


[FLINK-1755] Fix possible NullPointerException in LocalInputChannel

This commit squashes:

 - [runtime] Rename create read view methods to be consistent
 - [runtime] Fix spillable subpartition view read offset after spilling
 - [tests] Add comment to ScheduleOrUpdateConsumersTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b86903d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b86903d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b86903d

Branch: refs/heads/master
Commit: 0b86903dc88bae3c40b8b22a02ecb8faccd737a4
Parents: d72a3f7
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Mar 25 11:20:28 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Mar 26 11:51:26 2015 +0100

----------------------------------------------------------------------
 .../runtime/io/network/NetworkEnvironment.java  |   2 +-
 .../netty/PartitionRequestServerHandler.java    |   6 +-
 .../partition/PipelinedSubpartition.java        |   3 +-
 .../partition/PipelinedSubpartitionView.java    |   1 -
 .../io/network/partition/ResultPartition.java   |   7 +-
 .../partition/ResultPartitionManager.java       |   9 +-
 .../partition/ResultPartitionProvider.java      |   6 +-
 .../network/partition/ResultSubpartition.java   |   3 +-
 .../partition/SpillableSubpartition.java        |   9 +-
 .../partition/SpillableSubpartitionView.java    |   4 +-
 .../partition/consumer/BufferOrEvent.java       |  11 +-
 .../partition/consumer/LocalInputChannel.java   |  50 ++--
 .../partition/PipelinedSubpartitionTest.java    |  12 +-
 .../consumer/LocalInputChannelTest.java         | 283 +++++++++++++++++++
 .../partition/consumer/SingleInputGateTest.java |   9 +-
 .../io/network/util/TestBufferFactory.java      |  16 +-
 .../io/network/util/TestPartitionProducer.java  |   8 +-
 .../network/util/TestSubpartitionProducer.java  |   7 +-
 .../ScheduleOrUpdateConsumersTest.java          |  26 +-
 .../test/iterative/KMeansForTestITCase.java     | 276 ------------------
 20 files changed, 398 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 6a1e8a6..e5dc8a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -153,7 +153,7 @@ public class NetworkEnvironment {
 				bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
 				partition.registerBufferPool(bufferPool);
 
-				partitionManager.registerIntermediateResultPartition(partition);
+				partitionManager.registerResultPartition(partition);
 			}
 			catch (Throwable t) {
 				if (bufferPool != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index cb26e51..6f4becd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import com.google.common.base.Optional;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -90,10 +88,10 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 				LOG.debug("Read channel on {}: {}.",ctx.channel().localAddress(), request);
 
 				ResultSubpartitionView queueIterator =
-						partitionProvider.getSubpartition(
+						partitionProvider.createSubpartitionView(
 								request.partitionId,
 								request.queueIndex,
-								Optional.<BufferProvider>of(bufferPool));
+								bufferPool);
 
 				if (queueIterator != null) {
 					outboundQueue.enqueue(queueIterator, request.receiverId);

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index a8a0a7b..4c8174a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -168,7 +167,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public PipelinedSubpartitionView getReadView(Optional<BufferProvider> bufferProvider) {
+	public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
 		synchronized (buffers) {
 			if (readView != null) {
 				throw new IllegalStateException("Subpartition is being or already has been " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 822e33a..1e44324 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -50,7 +50,6 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 	@Override
 	public boolean registerListener(NotificationListener listener) {
 		return !isReleased.get() && parent.registerListener(listener);
-
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ddd47dd..92e27d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -169,7 +168,7 @@ public class ResultPartition implements BufferPoolOwner {
 	 * to the life-cycle of task registrations in the {@link TaskManager}.
 	 */
 	public void registerBufferPool(BufferPool bufferPool) {
-		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == getNumberOfSubpartitions(),
+		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
 				"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
 
 		checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool.");
@@ -302,13 +301,13 @@ public class ResultPartition implements BufferPoolOwner {
 	/**
 	 * Returns the requested subpartition.
 	 */
-	public ResultSubpartitionView getSubpartition(int index, Optional<BufferProvider> bufferProvider) throws IOException {
+	public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider) throws IOException {
 		int refCnt = pendingReferences.get();
 
 		checkState(refCnt != -1, "Partition released.");
 		checkState(refCnt > 0, "Partition not pinned.");
 
-		return subpartitions[index].getReadView(bufferProvider);
+		return subpartitions[index].createReadView(bufferProvider);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index c120de8..a666208 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Table;
@@ -46,7 +45,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 
 	private boolean isShutdown;
 
-	public void registerIntermediateResultPartition(ResultPartition partition) throws IOException {
+	public void registerResultPartition(ResultPartition partition) throws IOException {
 		synchronized (registeredPartitions) {
 			checkState(!isShutdown, "Result partition manager already shut down.");
 
@@ -64,10 +63,10 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 	}
 
 	@Override
-	public ResultSubpartitionView getSubpartition(
+	public ResultSubpartitionView createSubpartitionView(
 			ResultPartitionID partitionId,
 			int subpartitionIndex,
-			Optional<BufferProvider> bufferProvider) throws IOException {
+			BufferProvider bufferProvider) throws IOException {
 
 		synchronized (registeredPartitions) {
 			final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),
@@ -79,7 +78,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 
 			LOG.debug("Requested partition {}.", partition);
 
-			return partition.getSubpartition(subpartitionIndex, bufferProvider);
+			return partition.createSubpartitionView(subpartitionIndex, bufferProvider);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 1f35f59..23dd1d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 
 import java.io.IOException;
@@ -28,6 +27,9 @@ public interface ResultPartitionProvider {
 	/**
 	 * Returns the requested intermediate result partition input view.
 	 */
-	ResultSubpartitionView getSubpartition(ResultPartitionID partitionId, int index, Optional<BufferProvider> bufferProvider) throws IOException;
+	ResultSubpartitionView createSubpartitionView(
+			ResultPartitionID partitionId,
+			int index,
+			BufferProvider bufferProvider) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 1538a1a..e2ce16e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 
@@ -74,7 +73,7 @@ public abstract class ResultSubpartition {
 
 	abstract public void release() throws IOException;
 
-	abstract public ResultSubpartitionView getReadView(Optional<BufferProvider> bufferProvider) throws IOException;
+	abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException;
 
 	abstract int releaseMemory() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index da6a847..7ec24ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -169,7 +168,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public ResultSubpartitionView getReadView(Optional<BufferProvider> bufferProvider) throws IOException {
+	public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
 		synchronized (buffers) {
 			if (!isFinished) {
 				throw new IllegalStateException("Subpartition has not been finished yet, " +
@@ -190,14 +189,14 @@ class SpillableSubpartition extends ResultSubpartition {
 				if (ioMode.isSynchronous()) {
 					readView = new SpilledSubpartitionViewSyncIO(
 							this,
-							bufferProvider.get().getMemorySegmentSize(),
+							bufferProvider.getMemorySegmentSize(),
 							spillWriter.getChannelID(),
 							0);
 				}
 				else {
 					readView = new SpilledSubpartitionViewAsyncIO(
 							this,
-							bufferProvider.get(),
+							bufferProvider,
 							ioManager,
 							spillWriter.getChannelID(),
 							0);
@@ -205,7 +204,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			}
 			else {
 				readView = new SpillableSubpartitionView(
-						this, bufferProvider.get(), buffers.size(), ioMode);
+						this, bufferProvider, buffers.size(), ioMode);
 			}
 
 			return readView;

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 59b1464..3d362d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -107,7 +107,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 					parent,
 					bufferProvider.getMemorySegmentSize(),
 					parent.spillWriter.getChannelID(),
-					0);
+					currentBytesRead);
 		}
 		else {
 			spilledView = new SpilledSubpartitionViewAsyncIO(
@@ -115,7 +115,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 					bufferProvider,
 					parent.ioManager,
 					parent.spillWriter.getChannelID(),
-					0);
+					currentBytesRead);
 		}
 
 		return spilledView.getNextBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index 032772c..d2f3035 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Either type for {@link Buffer} or {@link AbstractEvent} instances tagged with the channel index,
@@ -36,14 +37,14 @@ public class BufferOrEvent {
 	private int channelIndex;
 
 	public BufferOrEvent(Buffer buffer, int channelIndex) {
-		this.buffer = buffer;
+		this.buffer = checkNotNull(buffer);
 		this.event = null;
 		this.channelIndex = channelIndex;
 	}
 
 	public BufferOrEvent(AbstractEvent event, int channelIndex) {
 		this.buffer = null;
-		this.event = event;
+		this.event = checkNotNull(event);
 		this.channelIndex = channelIndex;
 	}
 
@@ -71,4 +72,10 @@ public class BufferOrEvent {
 		checkArgument(channelIndex >= 0);
 		this.channelIndex = channelIndex;
 	}
+
+	@Override
+	public String toString() {
+		return String.format("BufferOrEvent [%s, channelIndex = %d]",
+				isBuffer() ? buffer : event, channelIndex);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 8ea6407..7cb62f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import com.google.common.base.Optional;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -43,24 +42,27 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
 
+	/** The local partition manager. */
 	private final ResultPartitionManager partitionManager;
 
+	/** Task event dispatcher for backwards events. */
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	private ResultSubpartitionView queueIterator;
+	/** The consumed subpartition */
+	private volatile ResultSubpartitionView subpartitionView;
 
 	private volatile boolean isReleased;
 
 	private volatile Buffer lookAhead;
 
 	LocalInputChannel(
-			SingleInputGate gate,
+			SingleInputGate inputGate,
 			int channelIndex,
 			ResultPartitionID partitionId,
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher) {
 
-		super(gate, channelIndex, partitionId);
+		super(inputGate, channelIndex, partitionId);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -72,14 +74,15 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
-		if (queueIterator == null) {
-			LOG.debug("Requesting LOCAL queue {} of partition {}.", subpartitionIndex, partitionId);
+		if (subpartitionView == null) {
+			LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
+					this, subpartitionIndex, partitionId);
 
-			queueIterator = partitionManager
-					.getSubpartition(partitionId, subpartitionIndex, Optional.of(inputGate.getBufferProvider()));
+			subpartitionView = partitionManager.createSubpartitionView(
+					partitionId, subpartitionIndex, inputGate.getBufferProvider());
 
-			if (queueIterator == null) {
-				throw new IOException("Error requesting sub partition.");
+			if (subpartitionView == null) {
+				throw new IOException("Error requesting subpartition.");
 			}
 
 			getNextLookAhead();
@@ -88,11 +91,11 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	Buffer getNextBuffer() throws IOException, InterruptedException {
-		checkState(queueIterator != null, "Queried for a buffer before requesting a queue.");
+		checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
 
 		// After subscribe notification
 		if (lookAhead == null) {
-			lookAhead = queueIterator.getNextBuffer();
+			lookAhead = subpartitionView.getNextBuffer();
 		}
 
 		Buffer next = lookAhead;
@@ -116,7 +119,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	void sendTaskEvent(TaskEvent event) throws IOException {
-		checkState(queueIterator != null, "Tried to send task event to producer before requesting a queue.");
+		checkState(subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
 
 		if (!taskEventDispatcher.publish(partitionId, event)) {
 			throw new IOException("Error while publishing event " + event + " to producer. The producer could not be found.");
@@ -134,8 +137,8 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	void notifySubpartitionConsumed() throws IOException {
-		if (queueIterator != null) {
-			queueIterator.notifySubpartitionConsumed();
+		if (subpartitionView != null) {
+			subpartitionView.notifySubpartitionConsumed();
 		}
 	}
 
@@ -151,9 +154,9 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 				lookAhead = null;
 			}
 
-			if (queueIterator != null) {
-				queueIterator.releaseAllResources();
-				queueIterator = null;
+			if (subpartitionView != null) {
+				subpartitionView.releaseAllResources();
+				subpartitionView = null;
 			}
 
 			isReleased = true;
@@ -186,15 +189,22 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	// ------------------------------------------------------------------------
 
 	private void getNextLookAhead() throws IOException, InterruptedException {
+
+		final ResultSubpartitionView view = subpartitionView;
+
+		if (view == null) {
+			return;
+		}
+
 		while (true) {
-			lookAhead = queueIterator.getNextBuffer();
+			lookAhead = view.getNextBuffer();
 
 			if (lookAhead != null) {
 				notifyAvailableBuffer();
 				break;
 			}
 
-			if (queueIterator.registerListener(this) || queueIterator.isReleased()) {
+			if (view.registerListener(this) || view.isReleased()) {
 				return;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 68c12f4..74549df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.base.Optional;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
@@ -130,10 +129,10 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		final PipelinedSubpartition subpartition = createSubpartition();
 
 		// Successful request
-		assertNotNull(subpartition.getReadView(Optional.<BufferProvider>absent()));
+		assertNotNull(subpartition.createReadView(null));
 
 		try {
-			subpartition.getReadView(Optional.<BufferProvider>absent());
+			subpartition.createReadView(null);
 
 			fail("Did not throw expected exception after duplicate read view request.");
 		}
@@ -147,7 +146,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		TestNotificationListener listener = new TestNotificationListener();
 
-		ResultSubpartitionView view = subpartition.getReadView(Optional.<BufferProvider>absent());
+		ResultSubpartitionView view = subpartition.createReadView(null);
 
 		// Empty => should return null
 		assertNull(view.getNextBuffer());
@@ -262,8 +261,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		final PipelinedSubpartition subpartition = createSubpartition();
 
-		final PipelinedSubpartitionView view = subpartition.getReadView(
-				Optional.<BufferProvider>absent());
+		final PipelinedSubpartitionView view = subpartition.createReadView(null);
 
 		Future<Boolean> producer = executorService.submit(
 				new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
new file mode 100644
index 0000000..cc90c44
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
+import org.apache.flink.runtime.io.network.util.TestProducerSource;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
+import static org.mockito.Mockito.mock;
+
+public class LocalInputChannelTest {
+
+	/**
+	 * Tests the consumption of multiple subpartitions via local input channels.
+	 *
+	 * <p> Multiple producer tasks produce pipelined partitions, which are consumed by multiple
+	 * tasks via local input channels.
+	 */
+	@Test
+	public void testConcurrentConsumeMultiplePartitions() throws Exception {
+		// Config
+		final int parallelism = 32;
+		final int producerBufferPoolSize = parallelism + 1;
+		final int numberOfBuffersPerChannel = 1024;
+
+		checkArgument(parallelism >= 1);
+		checkArgument(producerBufferPoolSize >= parallelism);
+		checkArgument(numberOfBuffersPerChannel >= 1);
+
+		// Setup
+		// One thread per produced partition and one per consumer
+		final ExecutorService executor = Executors.newFixedThreadPool(2 * parallelism);
+
+		final NetworkBufferPool networkBuffers = new NetworkBufferPool(
+				(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
+				TestBufferFactory.BUFFER_SIZE);
+
+		final ResultPartitionConsumableNotifier partitionConsumableNotifier =
+				mock(ResultPartitionConsumableNotifier.class);
+
+		final IOManager ioManager = mock(IOManager.class);
+
+		final JobID jobId = new JobID();
+
+		final ResultPartitionManager partitionManager = new ResultPartitionManager();
+
+		final ResultPartitionID[] partitionIds = new ResultPartitionID[parallelism];
+		final TestPartitionProducer[] partitionProducers = new TestPartitionProducer[parallelism];
+
+		// Create all partitions
+		for (int i = 0; i < parallelism; i++) {
+			partitionIds[i] = new ResultPartitionID();
+
+			final ResultPartition partition = new ResultPartition(
+					jobId,
+					partitionIds[i],
+					ResultPartitionType.PIPELINED,
+					parallelism,
+					partitionManager,
+					partitionConsumableNotifier,
+					ioManager,
+					ASYNC);
+
+			// Create a buffer pool for this partition
+			partition.registerBufferPool(
+					networkBuffers.createBufferPool(producerBufferPoolSize, true));
+
+			// Create the producer
+			partitionProducers[i] = new TestPartitionProducer(
+					partition,
+					false,
+					new TestPartitionProducerBufferSource(
+							parallelism,
+							partition.getBufferProvider(),
+							numberOfBuffersPerChannel)
+			);
+
+			// Register with the partition manager in order to allow the local input channels to
+			// request their respective partitions.
+			partitionManager.registerResultPartition(partition);
+		}
+
+		// Test
+		try {
+			// Submit producer tasks
+			List<Future<?>> results = Lists.newArrayListWithCapacity(
+					parallelism + 1);
+
+			for (int i = 0; i < parallelism; i++) {
+				results.add(executor.submit(partitionProducers[i]));
+			}
+
+			// Submit consumer
+			for (int i = 0; i < parallelism; i++) {
+				results.add(executor.submit(
+						new TestLocalInputChannelConsumer(
+								i,
+								parallelism,
+								numberOfBuffersPerChannel,
+								networkBuffers.createBufferPool(parallelism, true),
+								partitionManager,
+								new TaskEventDispatcher(),
+								partitionIds)));
+			}
+
+			// Wait for all to finish
+			for (Future<?> result : results) {
+				result.get();
+			}
+		}
+		finally {
+			networkBuffers.destroy();
+			executor.shutdown();
+		}
+	}
+
+	/**
+	 * Returns the configured number of buffers for each channel in a random order.
+	 */
+	private static class TestPartitionProducerBufferSource implements TestProducerSource {
+
+		private final BufferProvider bufferProvider;
+
+		private final List<Byte> channelIndexes;
+
+		public TestPartitionProducerBufferSource(
+				int parallelism,
+				BufferProvider bufferProvider,
+				int numberOfBuffersToProduce) {
+
+			this.bufferProvider = bufferProvider;
+			this.channelIndexes = Lists.newArrayListWithCapacity(
+					parallelism * numberOfBuffersToProduce);
+
+			// Array of channel indexes to produce buffers for
+			for (byte i = 0; i < parallelism; i++) {
+				for (int j = 0; j < numberOfBuffersToProduce; j++) {
+					channelIndexes.add(i);
+				}
+			}
+
+			// Random buffer to channel ordering
+			Collections.shuffle(channelIndexes);
+		}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws Exception {
+			if (channelIndexes.size() > 0) {
+				final int channelIndex = channelIndexes.remove(0);
+
+				return new BufferOrEvent(bufferProvider.requestBufferBlocking(), channelIndex);
+			}
+
+			return null;
+		}
+	}
+
+	/**
+	 * Consumed the configured result partitions and verifies that each channel receives the
+	 * expected number of buffers.
+	 */
+	private static class TestLocalInputChannelConsumer implements Callable<Void> {
+
+		private final SingleInputGate inputGate;
+
+		private final int numberOfInputChannels;
+
+		private final int numberOfExpectedBuffersPerChannel;
+
+		public TestLocalInputChannelConsumer(
+				int subpartitionIndex,
+				int numberOfInputChannels,
+				int numberOfExpectedBuffersPerChannel,
+				BufferPool bufferPool,
+				ResultPartitionManager partitionManager,
+				TaskEventDispatcher taskEventDispatcher,
+				ResultPartitionID[] consumedPartitionIds) {
+
+			checkArgument(numberOfInputChannels >= 1);
+			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
+
+			this.inputGate = new SingleInputGate(
+					new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels);
+
+			// Set buffer pool
+			inputGate.setBufferPool(bufferPool);
+
+			// Setup input channels
+			for (int i = 0; i < numberOfInputChannels; i++) {
+				inputGate.setInputChannel(
+						new IntermediateResultPartitionID(),
+						new LocalInputChannel(
+								inputGate,
+								i,
+								consumedPartitionIds[i],
+								partitionManager,
+								taskEventDispatcher));
+			}
+
+			this.numberOfInputChannels = numberOfInputChannels;
+			this.numberOfExpectedBuffersPerChannel = numberOfExpectedBuffersPerChannel;
+		}
+
+		@Override
+		public Void call() throws Exception {
+			// One counter per input channel. Expect the same number of buffers from each channel.
+			final int[] numberOfBuffersPerChannel = new int[numberOfInputChannels];
+
+			try {
+				BufferOrEvent boe;
+				while ((boe = inputGate.getNextBufferOrEvent()) != null) {
+					if (boe.isBuffer()) {
+						boe.getBuffer().recycle();
+
+						// Check that we don't receive too many buffers
+						if (++numberOfBuffersPerChannel[boe.getChannelIndex()]
+								> numberOfExpectedBuffersPerChannel) {
+
+							throw new IllegalStateException("Received more buffers than expected " +
+									"on channel " + boe.getChannelIndex() + ".");
+						}
+					}
+				}
+
+				// Verify that we received the expected number of buffers on each channel
+				for (int i = 0; i < numberOfBuffersPerChannel.length; i++) {
+					final int actualNumberOfReceivedBuffers = numberOfBuffersPerChannel[i];
+
+					if (actualNumberOfReceivedBuffers != numberOfExpectedBuffersPerChannel) {
+						throw new IllegalStateException("Received unexpected number of buffers " +
+								"on channel " + i + " (" + actualNumberOfReceivedBuffers + " instead " +
+								"of " + numberOfExpectedBuffersPerChannel + ").");
+					}
+				}
+			}
+			finally {
+				inputGate.releaseAllResources();
+			}
+
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 5871514..66eeee0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import com.google.common.base.Optional;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -90,7 +90,6 @@ public class SingleInputGateTest {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testBackwardsEventWithUninitializedChannel() throws Exception {
 		// Setup environment
 		final TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
@@ -100,7 +99,7 @@ public class SingleInputGateTest {
 		when(iterator.getNextBuffer()).thenReturn(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-		when(partitionManager.getSubpartition(any(ResultPartitionID.class), anyInt(), any(Optional.class))).thenReturn(iterator);
+		when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))).thenReturn(iterator);
 
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
@@ -129,7 +128,7 @@ public class SingleInputGateTest {
 		inputGate.requestPartitions();
 
 		// Only the local channel can request
-		verify(partitionManager, times(1)).getSubpartition(any(ResultPartitionID.class), anyInt(), any(Optional.class));
+		verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
 
 		// Send event backwards and initialize unknown channel afterwards
 		final TaskEvent event = new TestTaskEvent();
@@ -141,7 +140,7 @@ public class SingleInputGateTest {
 		// After the update, the pending event should be send to local channel
 		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
 
-		verify(partitionManager, times(2)).getSubpartition(any(ResultPartitionID.class), anyInt(), any(Optional.class));
+		verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
 		verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 0ff42b6..cdba545 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -30,11 +30,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 public class TestBufferFactory {
 
-	private static final int defaultSize = 32 * 1024;
+	public static final int BUFFER_SIZE = 32 * 1024;
 
-	private static final BufferRecycler discardingRecycler = new DiscardingRecycler();
+	private static final BufferRecycler RECYCLER = new DiscardingRecycler();
 
-	private static final Buffer mockBuffer = createBuffer();
+	private static final Buffer MOCK_BUFFER = createBuffer();
 
 	private final int bufferSize;
 
@@ -43,11 +43,11 @@ public class TestBufferFactory {
 	private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
 
 	public TestBufferFactory() {
-		this(defaultSize, discardingRecycler);
+		this(BUFFER_SIZE, RECYCLER);
 	}
 
 	public TestBufferFactory(int bufferSize) {
-		this(bufferSize, discardingRecycler);
+		this(bufferSize, RECYCLER);
 	}
 
 	public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) {
@@ -79,16 +79,16 @@ public class TestBufferFactory {
 	// ------------------------------------------------------------------------
 
 	public static Buffer createBuffer() {
-		return createBuffer(defaultSize);
+		return createBuffer(BUFFER_SIZE);
 	}
 
 	public static Buffer createBuffer(int bufferSize) {
 		checkArgument(bufferSize > 0);
 
-		return new Buffer(new MemorySegment(new byte[bufferSize]), discardingRecycler);
+		return new Buffer(new MemorySegment(new byte[bufferSize]), RECYCLER);
 	}
 
 	public static Buffer getMockBuffer() {
-		return mockBuffer;
+		return MOCK_BUFFER;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
index 31fd4a4..d6dd9ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -68,6 +68,8 @@ public class TestPartitionProducer implements Callable<Boolean> {
 	@Override
 	public Boolean call() throws Exception {
 
+		boolean success = false;
+
 		try {
 			BufferOrEvent bufferOrEvent;
 
@@ -98,10 +100,14 @@ public class TestPartitionProducer implements Callable<Boolean> {
 
 			partition.finish();
 
+			success = true;
+
 			return true;
 		}
 		finally {
-			partition.release();
+			if (!success) {
+				partition.release();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
index 52c156e..e5312ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -68,6 +68,8 @@ public class TestSubpartitionProducer implements Callable<Boolean> {
 	@Override
 	public Boolean call() throws Exception {
 
+		boolean success = false;
+
 		try {
 			BufferOrEvent bufferOrEvent;
 
@@ -96,10 +98,13 @@ public class TestSubpartitionProducer implements Callable<Boolean> {
 
 			subpartition.finish();
 
+			success = true;
 			return true;
 		}
 		finally {
-			subpartition.release();
+			if (!success) {
+				subpartition.release();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 721ccf4..70bbc25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -66,6 +66,23 @@ public class ScheduleOrUpdateConsumersTest {
 		flink.stop();
 	}
 
+	/**
+	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
+	 * result.
+	 *
+	 * <pre>
+	 *                             +----------+
+	 *            +-- pipelined -> | Receiver |
+	 * +--------+ |                +----------+
+	 * | Sender |-|
+	 * +--------+ |                +----------+
+	 *            +-- blocking --> | Receiver |
+	 *                             +----------+
+	 * </pre>
+	 *
+	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
+	 * one after all subtasks are finished.
+	 */
 	@Test
 	public void testMixedPipelinedAndBlockingResults() throws Exception {
 		final AbstractJobVertex sender = new AbstractJobVertex("Sender");
@@ -92,13 +109,18 @@ public class ScheduleOrUpdateConsumersTest {
 				DistributionPattern.ALL_TO_ALL,
 				ResultPartitionType.BLOCKING);
 
-		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
+		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
 
 		sender.setSlotSharingGroup(slotSharingGroup);
 		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
 		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
 
-		final JobGraph jobGraph = new JobGraph("", sender, pipelinedReceiver, blockingReceiver);
+		final JobGraph jobGraph = new JobGraph(
+				"Mixed pipelined and blocking result",
+				sender,
+				pipelinedReceiver,
+				blockingReceiver);
 
 		JobClient.submitJobAndWait(jobGraph, false, jobClient, TestingUtils.TESTING_DURATION());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b86903d/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
deleted file mode 100644
index 732bd06..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.localDistributed.PackagedProgramEndToEndITCase;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.testjar.KMeansForTest;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * This K-Means is a copy of {@link KMeansForTest} from the {@link PackagedProgramEndToEndITCase},
- * which detected a problem with the wiring of blocking intermediate results reproducibly with
- * multiple runs, whereas other tests didn't.
- *
- * <p> The code is copied here, because the packaged program test removes the classes from the
- * classpath.
- *
- * <p> It's safe to remove this test in the future.
- */
-public class KMeansForTestITCase extends JavaProgramTestBase {
-
-	protected String dataPath;
-	protected String clusterPath;
-	protected String resultPath;
-
-	public KMeansForTestITCase(){
-		setNumTaskManagers(2);
-		setTaskManagerNumSlots(2);
-		setNumberOfTestRepetitions(10);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
-		clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		int numIterations = 20;
-
-		// get input data
-		DataSet<Point> points = env.readCsvFile(dataPath)
-				.fieldDelimiter("|")
-				.includeFields(true, true)
-				.types(Double.class, Double.class)
-				.map(new TuplePointConverter());
-
-		DataSet<Centroid> centroids = env.readCsvFile(clusterPath)
-				.fieldDelimiter("|")
-				.includeFields(true, true, true)
-				.types(Integer.class, Double.class, Double.class)
-				.map(new TupleCentroidConverter());
-
-		// set number of bulk iterations for KMeans algorithm
-		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
-
-		DataSet<Centroid> newCentroids = points
-				// compute closest centroid for each point
-				.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
-						// count and sum point coordinates for each centroid
-				.map(new CountAppender())
-						// !test if key expressions are working!
-				.groupBy("field0").reduce(new CentroidAccumulator())
-						// compute new centroids from point counts and coordinate sums
-				.map(new CentroidAverager());
-
-		// feed new centroids back into next iteration
-		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
-
-		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
-				// assign points to final clusters
-				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
-
-		// emit result
-		clusteredPoints.writeAsCsv(resultPath, "\n", " ");
-
-		env.execute("KMeansForTest");
-	}
-
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-
-	/**
-	 * A simple two-dimensional point.
-	 */
-	public static class Point implements Serializable {
-
-		public double x, y;
-
-		public Point() {}
-
-		public Point(double x, double y) {
-			this.x = x;
-			this.y = y;
-		}
-
-		public Point add(Point other) {
-			x += other.x;
-			y += other.y;
-			return this;
-		}
-
-		public Point div(long val) {
-			x /= val;
-			y /= val;
-			return this;
-		}
-
-		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
-		}
-
-		public void clear() {
-			x = y = 0.0;
-		}
-
-		@Override
-		public String toString() {
-			return x + " " + y;
-		}
-	}
-
-	/**
-	 * A simple two-dimensional centroid, basically a point with an ID.
-	 */
-	public static class Centroid extends Point {
-
-		public int id;
-
-		public Centroid() {}
-
-		public Centroid(int id, double x, double y) {
-			super(x,y);
-			this.id = id;
-		}
-
-		public Centroid(int id, Point p) {
-			super(p.x, p.y);
-			this.id = id;
-		}
-
-		@Override
-		public String toString() {
-			return id + " " + super.toString();
-		}
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/** Converts a Tuple2<Double,Double> into a Point. */
-	public static final class TuplePointConverter extends RichMapFunction<Tuple2<Double, Double>, Point> {
-
-		@Override
-		public Point map(Tuple2<Double, Double> t) throws Exception {
-			return new Point(t.f0, t.f1);
-		}
-	}
-
-	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
-	public static final class TupleCentroidConverter extends RichMapFunction<Tuple3<Integer, Double, Double>, Centroid> {
-
-		@Override
-		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
-			return new Centroid(t.f0, t.f1, t.f2);
-		}
-	}
-
-	/** Determines the closest cluster center for a data point. */
-	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
-		private Collection<Centroid> centroids;
-
-		/** Reads the centroid values from a broadcast variable into a collection. */
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
-		}
-
-		@Override
-		public Tuple2<Integer, Point> map(Point p) throws Exception {
-
-			double minDistance = Double.MAX_VALUE;
-			int closestCentroidId = -1;
-
-			// check all cluster centers
-			for (Centroid centroid : centroids) {
-				// compute distance
-				double distance = p.euclideanDistance(centroid);
-
-				// update nearest cluster if necessary
-				if (distance < minDistance) {
-					minDistance = distance;
-					closestCentroidId = centroid.id;
-				}
-			}
-
-			// emit a new record with the center id and the data point.
-			return new Tuple2<Integer, Point>(closestCentroidId, p);
-		}
-	}
-
-	// Use this so that we can check whether POJOs and the POJO comparator also work
-	public static final class DummyTuple3IntPointLong {
-		public Integer field0;
-		public Point field1;
-		public Long field2;
-
-		public DummyTuple3IntPointLong() {}
-
-		DummyTuple3IntPointLong(Integer f0, Point f1, Long f2) {
-			this.field0 = f0;
-			this.field1 = f1;
-			this.field2 = f2;
-		}
-	}
-
-	/** Appends a count variable to the tuple. */
-	public static final class CountAppender extends RichMapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
-
-		@Override
-		public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) {
-			return new DummyTuple3IntPointLong(t.f0, t.f1, 1L);
-		}
-	}
-
-	/** Sums and counts point coordinates. */
-	public static final class CentroidAccumulator extends RichReduceFunction<DummyTuple3IntPointLong> {
-
-		@Override
-		public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong val2) {
-			return new DummyTuple3IntPointLong(val1.field0, val1.field1.add(val2.field1), val1.field2 + val2.field2);
-		}
-	}
-
-	/** Computes new centroid from coordinate sum and count of points. */
-	public static final class CentroidAverager extends RichMapFunction<DummyTuple3IntPointLong, Centroid> {
-
-		@Override
-		public Centroid map(DummyTuple3IntPointLong value) {
-			return new Centroid(value.field0, value.field1.div(value.field2));
-		}
-	}
-}


Mime
View raw message