flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [08/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant
Date Wed, 18 Mar 2015 16:48:57 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
deleted file mode 100644
index 174211a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import akka.actor.ActorRef;
-import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
-import com.google.common.base.Optional;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.queue.IllegalQueueIteratorRequestException;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
-import org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueue;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-public class IntermediateResultPartition implements BufferPoolOwner {
-
-	private static final Logger LOG = LoggerFactory.getLogger(IntermediateResultPartition.class);
-
-	private final RuntimeEnvironment environment;
-
-	/**
-	 * Note: This index needs to correspond to the index of the partition in
-	 * {@link ExecutionVertex#resultPartitions}, which might be a little
-	 * fragile as the data availability notifications use it.
-	 */
-	private final int partitionIndex;
-
-	private final JobID jobId;
-
-	private final ExecutionAttemptID producerExecutionId;
-
-	private final IntermediateResultPartitionID partitionId;
-
-	private final IntermediateResultPartitionType partitionType;
-
-	private final NetworkEnvironment networkEnvironment;
-
-	private final IntermediateResultPartitionQueue[] queues;
-
-	private volatile boolean hasNotifiedConsumers;
-
-	private volatile boolean isReleased;
-
-	private boolean isFinished;
-
-	private BufferPool bufferPool;
-
-	public IntermediateResultPartition(RuntimeEnvironment environment, int partitionIndex, JobID jobId,
-			ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, IntermediateResultPartitionType partitionType,
-			IntermediateResultPartitionQueue[] partitionQueues, NetworkEnvironment networkEnvironment) {
-		this.environment = environment;
-		this.partitionIndex = partitionIndex;
-		this.jobId = jobId;
-		this.producerExecutionId = executionId;
-		this.partitionId = partitionId;
-		this.partitionType = partitionType;
-		this.networkEnvironment = networkEnvironment;
-		this.queues = partitionQueues;
-	}
-
-	// ------------------------------------------------------------------------
-	// Properties
-	// ------------------------------------------------------------------------
-
-	public void setBufferPool(BufferPool bufferPool) {
-		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == getNumberOfQueues(),
-				"Buffer pool has not enough buffers for this intermediate result.");
-		checkState(this.bufferPool == null, "Buffer pool has already been set for intermediate result partition.");
-
-		this.bufferPool = checkNotNull(bufferPool);
-	}
-
-	public ExecutionAttemptID getProducerExecutionId() {
-		return producerExecutionId;
-	}
-
-	public IntermediateResultPartitionID getPartitionId() {
-		return partitionId;
-	}
-
-	public JobID getJobId() {
-		return jobId;
-	}
-
-	public int getNumberOfQueues() {
-		return queues.length;
-	}
-
-	public BufferProvider getBufferProvider() {
-		return bufferPool;
-	}
-
-	public boolean isFinished() {
-		return isFinished;
-	}
-
-	// ------------------------------------------------------------------------
-	// Produce
-	// ------------------------------------------------------------------------
-
-	public void add(Buffer buffer, int targetQueue) throws IOException {
-		synchronized (queues) {
-			if (isReleased) {
-				buffer.recycle();
-			}
-			else {
-				checkInProducePhase();
-				queues[targetQueue].add(buffer);
-			}
-		}
-
-		maybeNotifyConsumers(partitionType.isPipelined());
-	}
-
-	public void finish() throws IOException {
-		boolean success = false;
-
-		synchronized (queues) {
-			checkInProducePhase();
-
-			try {
-				if (!isReleased) {
-					for (IntermediateResultPartitionQueue queue : queues) {
-						queue.finish();
-					}
-
-					success = true;
-				}
-			}
-			finally {
-				isFinished = true;
-			}
-		}
-
-		if (success) {
-			// Notify at this point in any case either because of the end
-			// of a blocking result or an empty pipelined result.
-			maybeNotifyConsumers(true);
-
-			if (!partitionType.isPersistent() && bufferPool != null) {
-				// If this partition is not persistent, immediately destroy
-				// the buffer pool. For persistent intermediate results, the
-				// partition manager needs to release the buffer pool.
-				bufferPool.destroy();
-			}
-		}
-	}
-
-	public void releaseAllResources() throws IOException {
-		synchronized (queues) {
-			LOG.debug("Release all resources of {}.", this);
-
-			if (!isReleased) {
-				try {
-					for (IntermediateResultPartitionQueue queue : queues) {
-						try {
-							queue.discard();
-						}
-						catch (Throwable t) {
-							LOG.error("Error while discarding queue: " + t.getMessage(), t);
-						}
-					}
-
-					if (bufferPool != null) {
-						bufferPool.destroy();
-					}
-				}
-				finally {
-					isReleased = true;
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Consume
-	// ------------------------------------------------------------------------
-
-	public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, Optional<BufferProvider> bufferProvider)
-			throws IOException {
-		synchronized (queues) {
-			if (isReleased) {
-				throw new IllegalQueueIteratorRequestException("Intermediate result partition has already been released.");
-			}
-
-			if (queueIndex < 0 || queueIndex >= queues.length) {
-				throw new IllegalQueueIteratorRequestException("Illegal queue index: " + queueIndex + ", allowed: 0-" + (queues.length - 1));
-			}
-
-			return queues[queueIndex].getQueueIterator(bufferProvider);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private void checkInProducePhase() {
-		checkState(!isReleased, "Partition has already been discarded.");
-		checkState(!isFinished, "Partition has already been finished.");
-	}
-
-	/**
-	 * Maybe notifies consumers of this result partition.
-	 */
-	private void maybeNotifyConsumers(boolean doNotify) throws IOException {
-		if (doNotify && !hasNotifiedConsumers) {
-			scheduleOrUpdateConsumers();
-			hasNotifiedConsumers = true;
-		}
-	}
-
-	private void scheduleOrUpdateConsumers() throws IOException {
-		if(!isReleased){
-			ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId,
-					producerExecutionId, partitionIndex);
-
-			Future<Object> futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg,
-					networkEnvironment.getJobManagerTimeout());
-
-			futureResponse.onFailure(new OnFailure(){
-				@Override
-				public void onFailure(Throwable failure) throws Throwable {
-					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
-
-					// Fail task at the TaskManager
-					FailTask failMsg = new FailTask(producerExecutionId,
-							new RuntimeException("Could not schedule or update consumers at " +
-									"the JobManager.", failure));
-
-					networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
-				}
-			}, AkkaUtils.globalExecutionContext());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Buffer pool owner methods
-	// ------------------------------------------------------------------------
-
-	/**
-	 * If this partition is registered as the {@link BufferPoolOwner} of a
-	 * {@link BufferPool}, it will forward the requests to the queues.
-	 */
-	@Override
-	public void recycleBuffers(int numBuffersToRecycle) throws IOException {
-		int numRecycledBuffers = 0;
-
-		for (IntermediateResultPartitionQueue queue : queues) {
-			numRecycledBuffers += queue.recycleBuffers();
-
-			if (numRecycledBuffers >= numBuffersToRecycle) {
-				break;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static IntermediateResultPartition create(RuntimeEnvironment environment, int partitionIndex, JobID jobId,
-			ExecutionAttemptID executionId, NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) {
-		final IntermediateResultPartitionID partitionId = checkNotNull(desc.getPartitionId());
-		final IntermediateResultPartitionType partitionType = checkNotNull(desc.getPartitionType());
-
-		final IntermediateResultPartitionQueue[] partitionQueues = new IntermediateResultPartitionQueue[desc.getNumberOfQueues()];
-
-		// TODO The queues need to be created depending on the result type
-		for (int i = 0; i < partitionQueues.length; i++) {
-			partitionQueues[i] = new PipelinedPartitionQueue();
-		}
-
-		return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType,
-				partitionQueues, networkEnvironment);
-	}
-
-	@Override
-	public String toString() {
-		return String.format("IntermediateResultPartition(JobID: %s, ExecutionID: %s, " +
-				"PartitionID: %s, PartitionType: %s, [num queues: %d, (isFinished: %b)",
-				jobId, producerExecutionId, partitionId, partitionType, queues.length, isFinished);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
deleted file mode 100644
index d5b8fe5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Table;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.queue.IllegalQueueIteratorRequestException;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * The intermediate result partition manager keeps track of all available
- * partitions of a task manager and
- */
-public class IntermediateResultPartitionManager implements IntermediateResultPartitionProvider {
-
-	private static final Logger LOG = LoggerFactory.getLogger(IntermediateResultPartitionManager.class);
-
-	private final Table<ExecutionAttemptID, IntermediateResultPartitionID,
-			IntermediateResultPartition> partitions = HashBasedTable.create();
-
-	private boolean isShutdown;
-
-	public void registerIntermediateResultPartition(IntermediateResultPartition partition) throws IOException {
-		synchronized (partitions) {
-			LOG.debug("Register intermediate result partition {}.", partition);
-
-			if (isShutdown) {
-				throw new IOException("Intermediate result partition manager has already been shut down.");
-			}
-
-			if (partitions.put(partition.getProducerExecutionId(), partition.getPartitionId(), partition) != null) {
-				throw new IOException("Tried to re-register intermediate result partition.");
-			}
-		}
-	}
-
-	public void failIntermediateResultPartitions(ExecutionAttemptID producerExecutionId) {
-		synchronized (partitions) {
-			List<IntermediateResultPartition> partitionsToFail = new ArrayList<IntermediateResultPartition>();
-
-			for (IntermediateResultPartitionID partitionId : partitions.row(producerExecutionId).keySet()) {
-				partitionsToFail.add(partitions.get(producerExecutionId, partitionId));
-			}
-
-			for(IntermediateResultPartition partition : partitionsToFail) {
-				failIntermediateResultPartition(partition);
-			}
-		}
-	}
-
-	private void failIntermediateResultPartition(IntermediateResultPartition partition) {
-		if (partition != null) {
-			try {
-				partition.releaseAllResources();
-			}
-			catch (Throwable t) {
-				LOG.error("Error during release of produced intermediate result partition: " + t.getMessage(), t);
-			}
-		}
-	}
-
-	public void shutdown() {
-		synchronized (partitions) {
-			for (IntermediateResultPartition partition : partitions.values()) {
-				try {
-					partition.releaseAllResources();
-				}
-				catch (IOException e) {
-					LOG.error("Error while releasing intermediate result partition: " + e.getMessage(), e);
-				}
-			}
-
-			isShutdown = true;
-		}
-	}
-
-	public int getNumberOfRegisteredPartitions() {
-		synchronized (partitions) {
-			return partitions.size();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Intermediate result partition provider
-	// ------------------------------------------------------------------------
-
-	@Override
-	public IntermediateResultPartitionQueueIterator getIntermediateResultPartitionIterator(
-			ExecutionAttemptID producerExecutionId,
-			IntermediateResultPartitionID partitionId,
-			int queueIndex,
-			Optional<BufferProvider> bufferProvider) throws IOException {
-
-		synchronized (partitions) {
-			IntermediateResultPartition partition = partitions.get(producerExecutionId, partitionId);
-
-			if (partition == null) {
-				if (!partitions.containsRow(producerExecutionId)) {
-					LOG.debug("Could not find producer execution ID {}. Registered producer" +
-							" execution IDs {}.", producerExecutionId,
-							Arrays.toString(partitions.rowKeySet().toArray()));
-
-					throw new IllegalQueueIteratorRequestException("Unknown producer execution ID " + producerExecutionId + ".");
-				}
-
-				throw new IllegalQueueIteratorRequestException("Unknown partition " + partitionId + ".");
-			}
-
-			return partition.getQueueIterator(queueIndex, bufferProvider);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java
deleted file mode 100644
index b18b3fc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import com.google.common.base.Optional;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import java.io.IOException;
-
-public interface IntermediateResultPartitionProvider {
-
-	IntermediateResultPartitionQueueIterator getIntermediateResultPartitionIterator(
-			ExecutionAttemptID producerExecutionId,
-			IntermediateResultPartitionID partitionId,
-			int requestedQueueIndex,
-			Optional<BufferProvider> bufferProvider) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
new file mode 100644
index 0000000..a8a0a7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import com.google.common.base.Optional;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.util.event.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A pipelined in-memory only subpartition, which can be consumed once.
+ */
+class PipelinedSubpartition extends ResultSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class);
+
+	/** Flag indicating whether the subpartition has been finished. */
+	private boolean isFinished;
+
+	/** Flag indicating whether the subpartition has been released. */
+	private boolean isReleased;
+
+	/**
+	 * A data availability listener. Registered, when the consuming task is faster than the
+	 * producing task.
+	 */
+	private NotificationListener registeredListener;
+
+	/** The read view to consume this subpartition. */
+	private PipelinedSubpartitionView readView;
+
+	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
+	final ArrayDeque<Buffer> buffers = new ArrayDeque<Buffer>();
+
+	PipelinedSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public boolean add(Buffer buffer) {
+		checkNotNull(buffer);
+
+		final NotificationListener listener;
+
+		synchronized (buffers) {
+			if (isReleased || isFinished) {
+				return false;
+			}
+
+			// Add the buffer and update the stats
+			buffers.add(buffer);
+			updateStatistics(buffer);
+
+			// Get the listener...
+			listener = registeredListener;
+			registeredListener = null;
+		}
+
+		// Notify the listener outside of the synchronized block
+		if (listener != null) {
+			listener.onNotification();
+		}
+
+		return true;
+	}
+
+	@Override
+	public void finish() {
+		final NotificationListener listener;
+
+		synchronized (buffers) {
+			if (isReleased || isFinished) {
+				return;
+			}
+
+			final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+			buffers.add(buffer);
+			updateStatistics(buffer);
+
+			isFinished = true;
+
+			LOG.debug("Finished {}.", this);
+
+			// Get the listener...
+			listener = registeredListener;
+			registeredListener = null;
+		}
+
+		// Notify the listener outside of the synchronized block
+		if (listener != null) {
+			listener.onNotification();
+		}
+	}
+
+	@Override
+	public void release() {
+		final NotificationListener listener;
+		final PipelinedSubpartitionView view;
+
+		synchronized (buffers) {
+			if (isReleased) {
+				return;
+			}
+
+			// Release all available buffers
+			Buffer buffer;
+			while ((buffer = buffers.poll()) != null) {
+				if (!buffer.isRecycled()) {
+					buffer.recycle();
+				}
+			}
+
+			// Get the view...
+			view = readView;
+			readView = null;
+
+			// Get the listener...
+			listener = registeredListener;
+			registeredListener = null;
+
+			// Make sure that no further buffers are added to the subpartition
+			isReleased = true;
+
+			LOG.debug("Released {}.", this);
+		}
+
+		// Release all resources of the view
+		if (view != null) {
+			view.releaseAllResources();
+		}
+
+		// Notify the listener outside of the synchronized block
+		if (listener != null) {
+			listener.onNotification();
+		}
+	}
+
+	@Override
+	public int releaseMemory() {
+		// The pipelined subpartition does not react to memory release requests. The buffers will be
+		// recycled by the consuming task.
+		return 0;
+	}
+
+	@Override
+	public PipelinedSubpartitionView getReadView(Optional<BufferProvider> bufferProvider) {
+		synchronized (buffers) {
+			if (readView != null) {
+				throw new IllegalStateException("Subpartition is being or already has been " +
+						"consumed, but pipelined subpartitions can only be consumed once.");
+			}
+
+			readView = new PipelinedSubpartitionView(this);
+
+			LOG.debug("Created {}.", readView);
+
+			return readView;
+		}
+	}
+
+	@Override
+	public String toString() {
+		synchronized (buffers) {
+			return String.format("PipelinedSubpartition [number of buffers: %d (%d bytes), " +
+							"finished? %s, read view? %s]",
+					getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null);
+		}
+	}
+
+	/**
+	 * Registers a listener with this subpartition and returns whether the registration was
+	 * successful.
+	 *
+	 * <p> A registered listener is notified when the state of the subpartition changes. After a
+	 * notification, the listener is unregistered. Only a single listener is allowed to be
+	 * registered.
+	 */
+	boolean registerListener(NotificationListener listener) {
+		synchronized (buffers) {
+			if (!buffers.isEmpty() || isReleased) {
+				return false;
+			}
+
+			if (registeredListener == null) {
+				registeredListener = listener;
+
+				return true;
+			}
+
+			throw new IllegalStateException("Already registered listener.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
new file mode 100644
index 0000000..822e33a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * View over a pipelined in-memory only subpartition.
+ */
+class PipelinedSubpartitionView implements ResultSubpartitionView {
+
+	/** The subpartition this view belongs to. */
+	private final PipelinedSubpartition parent;
+
+	/** Flag indicating whether this view has been released. */
+	private AtomicBoolean isReleased = new AtomicBoolean();
+
+	PipelinedSubpartitionView(PipelinedSubpartition parent) {
+		this.parent = checkNotNull(parent);
+	}
+
+	@Override
+	public Buffer getNextBuffer() {
+		synchronized (parent.buffers) {
+			return parent.buffers.poll();
+		}
+	}
+
+	@Override
+	public boolean registerListener(NotificationListener listener) {
+		return !isReleased.get() && parent.registerListener(listener);
+
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() {
+		releaseAllResources();
+	}
+
+	@Override
+	public void releaseAllResources() {
+		if (isReleased.compareAndSet(false, true)) {
+			// The view doesn't hold any resources and the parent cannot be restarted. Therefore,
+			// it's OK to notify about consumption as well.
+			parent.onConsumedSubpartition();
+		}
+	}
+
+	@Override
+	public boolean isReleased() {
+		return isReleased.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
new file mode 100644
index 0000000..95aa636
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnFailure;
+import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
+
+/**
+ * A result partition for data produced by a single task.
+ *
+ * <p> This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
+ * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
+ * or more {@link ResultSubpartition} instances, which further partition the data depending on the
+ * number of consuming tasks and the data {@link DistributionPattern}.
+ *
+ * <p> Tasks, which consume a result partition have to request one of its subpartitions. The request
+ * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
+ *
+ * <h2>Life-cycle</h2>
+ *
+ * The life-cycle of each result partition has three (possibly overlapping) phases:
+ * <ol>
+ * <li><strong>Produce</strong>: </li>
+ * <li><strong>Consume</strong>: </li>
+ * <li><strong>Release</strong>: </li>
+ * </ol>
+ *
+ * <h2>Lazy deployment and updates of consuming tasks</h2>
+ *
+ * Before a consuming task can request the result, it has to be deployed. The time of deployment
+ * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined
+ * results, receivers are deployed as soon as the first buffer is added to the result partition.
+ * With blocking results on the other hand, receivers are deployed after the partition is finished.
+ *
+ * <h2>Buffer management</h2>
+ *
+ * <h2>State management</h2>
+ */
+public class ResultPartition implements BufferPoolOwner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
+
+	private final JobID jobId;
+
+	private final ResultPartitionID partitionId;
+
+	/** Type of this partition. Defines the concrete subpartition implementation to use. */
+	private final ResultPartitionType partitionType;
+
+	/** The subpartitions of this partition. At least one. */
+	private final ResultSubpartition[] subpartitions;
+
+	private final NetworkEnvironment networkEnvironment;
+
+	// - Runtime state --------------------------------------------------------
+
+	private final AtomicBoolean isReleased = new AtomicBoolean();
+
+	/**
+	 * The total number of references to subpartitions of this result. The result partition can be
+	 * safely released, iff the reference count is zero. A reference count of -1 denotes that the
+	 * result partition has been released.
+	 */
+	private final AtomicInteger pendingReferences = new AtomicInteger();
+
+	private BufferPool bufferPool;
+
+	private boolean hasNotifiedPipelinedConsumers;
+
+	private boolean isFinished;
+
+	// - Statistics ----------------------------------------------------------
+
+	/** The total number of buffers (both data and event buffers) */
+	private int totalNumberOfBuffers;
+
+	/** The total number of bytes (both data and event buffers) */
+	private long totalNumberOfBytes;
+
+	public ResultPartition(
+			JobID jobId,
+			ResultPartitionID partitionId,
+			ResultPartitionType partitionType,
+			int numberOfSubpartitions,
+			NetworkEnvironment networkEnvironment,
+			IOManager ioManager) {
+
+		this.jobId = checkNotNull(jobId);
+		this.partitionId = checkNotNull(partitionId);
+		this.partitionType = checkNotNull(partitionType);
+		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
+		this.networkEnvironment = checkNotNull(networkEnvironment);
+
+		// Create the subpartitions.
+		switch (partitionType) {
+			case BLOCKING:
+				for (int i = 0; i < subpartitions.length; i++) {
+					subpartitions[i] = new SpillableSubpartition(
+							i, this, ioManager, networkEnvironment.getDefaultIOMode());
+				}
+
+				break;
+
+			case PIPELINED:
+				for (int i = 0; i < subpartitions.length; i++) {
+					subpartitions[i] = new PipelinedSubpartition(i, this);
+				}
+
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unsupported result partition type.");
+		}
+
+		// Initially, partitions should be consumed once before release.
+		pin();
+
+		LOG.debug("Initialized {}", this);
+	}
+
+	/**
+	 * Registers a buffer pool with this result partition.
+	 * <p>
+	 * There is one pool for each result partition, which is shared by all its sub partitions.
+	 * <p>
+	 * The pool is registered with the partition *after* it as been constructed in order to conform
+	 * to the life-cycle of task registrations in the {@link TaskManager}.
+	 */
+	public void registerBufferPool(BufferPool bufferPool) {
+		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == getNumberOfSubpartitions(),
+				"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");
+
+		checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool.");
+
+		this.bufferPool = checkNotNull(bufferPool);
+
+		// If the partition type is back pressure-free, we register with the buffer pool for
+		// callbacks to release memory.
+		if (!partitionType.hasBackPressure()) {
+			bufferPool.setBufferPoolOwner(this);
+		}
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public ResultPartitionID getPartitionId() {
+		return partitionId;
+	}
+
+	public int getNumberOfSubpartitions() {
+		return subpartitions.length;
+	}
+
+	public BufferProvider getBufferProvider() {
+		return bufferPool;
+	}
+
+	public int getTotalNumberOfBuffers() {
+		return totalNumberOfBuffers;
+	}
+
+	public long getTotalNumberOfBytes() {
+		return totalNumberOfBytes;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Adds a buffer to the subpartition with the given index.
+	 *
+	 * <p> For PIPELINED results, this will trigger the deployment of consuming tasks after the
+	 * first buffer has been added.
+	 */
+	public void add(Buffer buffer, int subpartitionIndex) throws IOException {
+		boolean success = false;
+
+		try {
+			checkInProduceState();
+
+			final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
+
+			synchronized (subpartition) {
+				success = subpartition.add(buffer);
+
+				// Update statistics
+				totalNumberOfBuffers++;
+				totalNumberOfBytes += buffer.getSize();
+			}
+		}
+		finally {
+			if (success) {
+				notifyPipelinedConsumers();
+			}
+			else {
+				buffer.recycle();
+			}
+		}
+	}
+
+	/**
+	 * Finishes the result partition.
+	 *
+	 * <p> After this operation, it is not possible to add further data to the result partition.
+	 *
+	 * <p> For BLOCKING results, this will trigger the deployment of consuming tasks.
+	 */
+	public void finish() throws IOException {
+		boolean success = false;
+
+		try {
+			checkInProduceState();
+
+			for (ResultSubpartition subpartition : subpartitions) {
+				synchronized (subpartition) {
+					subpartition.finish();
+				}
+			}
+
+			success = true;
+		}
+		finally {
+			if (success) {
+				isFinished = true;
+
+				notifyPipelinedConsumers();
+			}
+		}
+	}
+
+	/**
+	 * Releases the result partition.
+	 */
+	public void release() {
+		if (isReleased.compareAndSet(false, true)) {
+			LOG.debug("Releasing {}", this);
+
+			try {
+				for (ResultSubpartition subpartition : subpartitions) {
+					try {
+						synchronized (subpartition) {
+							subpartition.release();
+						}
+					}
+					// Catch this in order to ensure that release is called on all subpartitions
+					catch (Throwable t) {
+						LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
+					}
+				}
+			}
+			finally {
+				if (bufferPool != null) {
+					bufferPool.lazyDestroy();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Returns the requested subpartition.
+	 */
+	public ResultSubpartitionView getSubpartition(int index, Optional<BufferProvider> bufferProvider) throws IOException {
+		int refCnt = pendingReferences.get();
+
+		checkState(refCnt != -1, "Partition released.");
+		checkState(refCnt > 0, "Partition not pinned.");
+
+		return subpartitions[index].getReadView(bufferProvider);
+	}
+
+	/**
+	 * Releases buffers held by this result partition.
+	 *
+	 * <p> This is a callback from the buffer pool, which is registered for result partitions, which
+	 * are back pressure-free.
+	 */
+	@Override
+	public void releaseMemory(int toRelease) throws IOException {
+		checkArgument(toRelease > 0);
+
+		for (ResultSubpartition subpartition : subpartitions) {
+			toRelease -= subpartition.releaseMemory();
+
+			// Only release as much memory as needed
+			if (toRelease <= 0) {
+				break;
+			}
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", "
+				+ subpartitions.length + " subpartitions, "
+				+ pendingReferences + " pending references]";
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Pins the result partition.
+	 *
+	 * <p> The partition can only be released after each subpartition has been consumed once per pin
+	 * operation.
+	 */
+	void pin() {
+		while (true) {
+			int refCnt = pendingReferences.get();
+
+			if (refCnt >= 0) {
+				if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) {
+					break;
+				}
+			}
+			else {
+				throw new IllegalStateException("Released.");
+			}
+		}
+	}
+
+	/**
+	 * Notification when a subpartition is released.
+	 */
+	void onConsumedSubpartition(int subpartitionIndex) {
+
+		if (isReleased.get()) {
+			return;
+		}
+
+		int refCnt = pendingReferences.decrementAndGet();
+
+		if (refCnt == 0) {
+			networkEnvironment.getPartitionManager().onConsumedPartition(this);
+		}
+		else if (refCnt < 0) {
+			throw new IllegalStateException("All references released.");
+		}
+
+		LOG.debug("{}: Received release notification for subpartition {} (reference count now at: {}).",
+				this, subpartitionIndex, pendingReferences);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void checkInProduceState() {
+		checkState(!isFinished, "Partition already finished.");
+	}
+
+	/**
+	 * Notifies pipelined consumers of this result partition once.
+	 */
+	private void notifyPipelinedConsumers() throws IOException {
+		if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
+			ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
+
+			Future<Object> futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg,
+					networkEnvironment.getJobManagerTimeout());
+
+			futureResponse.onFailure(new OnFailure() {
+				@Override
+				public void onFailure(Throwable failure) throws Throwable {
+					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
+
+					// Fail task at the TaskManager
+					FailTask failMsg = new FailTask(partitionId.getProducerId(),
+							new RuntimeException("Could not schedule or update consumers at " +
+									"the JobManager.", failure));
+
+					networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
+				}
+			}, AkkaUtils.globalExecutionContext());
+
+			hasNotifiedPipelinedConsumers = true;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
new file mode 100644
index 0000000..af2970d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+
+/**
+ * Runtime identifier of a produced {@link IntermediateResultPartition}.
+ *
+ * <p> In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely
+ * identify a result partition. It needs to be associated with the producing task as well to ensure
+ * correct tracking of failed/restarted tasks.
+ */
+public final class ResultPartitionID implements Serializable {
+
+	private final IntermediateResultPartitionID partitionId;
+
+	private final ExecutionAttemptID producerId;
+
+	public ResultPartitionID() {
+		this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
+	}
+
+	public ResultPartitionID(IntermediateResultPartitionID partitionId, ExecutionAttemptID producerId) {
+		this.partitionId = partitionId;
+		this.producerId = producerId;
+	}
+
+	public IntermediateResultPartitionID getPartitionId() {
+		return partitionId;
+	}
+
+	public ExecutionAttemptID getProducerId() {
+		return producerId;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj.getClass() == ResultPartitionID.class) {
+			ResultPartitionID o = (ResultPartitionID) obj;
+
+			return o.getPartitionId().equals(partitionId) && o.getProducerId().equals(producerId);
+		}
+
+		return false;
+	}
+
+	@Override
+	public int hashCode() {
+		return partitionId.hashCode() ^ producerId.hashCode();
+	}
+
+	@Override
+	public String toString() {
+		return partitionId.toShortString() + "@" + producerId.toShortString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
new file mode 100644
index 0000000..c120de8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Table;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * The result partition manager keeps track of all currently produced/consumed partitions of a
+ * task manager.
+ */
+public class ResultPartitionManager implements ResultPartitionProvider {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class);
+
+	public final Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition>
+			registeredPartitions = HashBasedTable.create();
+
+	private boolean isShutdown;
+
+	public void registerIntermediateResultPartition(ResultPartition partition) throws IOException {
+		synchronized (registeredPartitions) {
+			checkState(!isShutdown, "Result partition manager already shut down.");
+
+			ResultPartitionID partitionId = partition.getPartitionId();
+
+			ResultPartition previous = registeredPartitions.put(
+					partitionId.getProducerId(), partitionId.getPartitionId(), partition);
+
+			if (previous != null) {
+				throw new IllegalStateException("Result partition already registered.");
+			}
+
+			LOG.debug("Registered {}.", partition);
+		}
+	}
+
+	@Override
+	public ResultSubpartitionView getSubpartition(
+			ResultPartitionID partitionId,
+			int subpartitionIndex,
+			Optional<BufferProvider> bufferProvider) throws IOException {
+
+		synchronized (registeredPartitions) {
+			final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),
+					partitionId.getPartitionId());
+
+			if (partition == null) {
+				throw new IOException("Unknown partition " + partitionId + ".");
+			}
+
+			LOG.debug("Requested partition {}.", partition);
+
+			return partition.getSubpartition(subpartitionIndex, bufferProvider);
+		}
+	}
+
+	public void releasePartitionsProducedBy(ExecutionAttemptID executionId) {
+		synchronized (registeredPartitions) {
+			final Map<IntermediateResultPartitionID, ResultPartition> partitions =
+					registeredPartitions.row(executionId);
+
+			for (ResultPartition partition : partitions.values()) {
+				partition.release();
+			}
+
+			for (IntermediateResultPartitionID partitionId : ImmutableList
+					.copyOf(partitions.keySet())) {
+
+				registeredPartitions.remove(executionId, partitionId);
+			}
+
+			LOG.debug("Released all partitions produced by {}.", executionId);
+		}
+	}
+
+	public void shutdown() {
+		synchronized (registeredPartitions) {
+
+			LOG.debug("Releasing {} partitions because of shutdown.",
+					registeredPartitions.values().size());
+
+			for (ResultPartition partition : registeredPartitions.values()) {
+				partition.release();
+			}
+
+			registeredPartitions.clear();
+
+			isShutdown = true;
+
+			LOG.debug("Successful shutdown.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Notifications
+	// ------------------------------------------------------------------------
+
+	void onConsumedPartition(ResultPartition partition) {
+		final ResultPartition previous;
+
+		LOG.debug("Received consume notification from {}.", partition);
+
+		synchronized (registeredPartitions) {
+			ResultPartitionID partitionId = partition.getPartitionId();
+
+			previous = registeredPartitions.remove(partitionId.getProducerId(),
+					partitionId.getPartitionId());
+		}
+
+		// Release the partition if it was successfully removed
+		if (partition == previous) {
+			partition.release();
+
+			LOG.debug("Released {}.", partition);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
new file mode 100644
index 0000000..1f35f59
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import com.google.common.base.Optional;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+
+import java.io.IOException;
+
+public interface ResultPartitionProvider {
+
+	/**
+	 * Returns the requested intermediate result partition input view.
+	 */
+	ResultSubpartitionView getSubpartition(ResultPartitionID partitionId, int index, Optional<BufferProvider> bufferProvider) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
new file mode 100644
index 0000000..65d49ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+public enum ResultPartitionType {
+
+	BLOCKING(true, false, false),
+
+	PIPELINED(false, true, true),
+
+	PIPELINED_PERSISTENT(true, true, true);
+
+	/** Does the partition live longer than the consuming task? */
+	private final boolean isPersistent;
+
+	/** Can the partition be consumed while being produced? */
+	private final boolean isPipelined;
+
+	/** Does the partition produce back pressure when not consumed? */
+	private final boolean hasBackPressure;
+
+	/**
+	 * Specifies the behaviour of an intermediate result partition at runtime.
+	 */
+	ResultPartitionType(boolean isPersistent, boolean isPipelined, boolean hasBackPressure) {
+		this.isPersistent = isPersistent;
+		this.isPipelined = isPipelined;
+		this.hasBackPressure = hasBackPressure;
+	}
+
+	public boolean hasBackPressure() {
+		return hasBackPressure;
+	}
+
+	public boolean isBlocking() {
+		return !isPipelined;
+	}
+
+	public boolean isPipelined() {
+		return isPipelined;
+	}
+
+	public boolean isPersistent() {
+		return isPersistent;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
new file mode 100644
index 0000000..1538a1a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import com.google.common.base.Optional;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+
+import java.io.IOException;
+
+/**
+ * A single subpartition of a {@link ResultPartition} instance.
+ */
+public abstract class ResultSubpartition {
+
+	/** The index of the subpartition at the parent partition. */
+	protected final int index;
+
+	/** The parent partition this subpartition belongs to. */
+	protected final ResultPartition parent;
+
+	// - Statistics ----------------------------------------------------------
+
+	/** The total number of buffers (both data and event buffers) */
+	private int totalNumberOfBuffers;
+
+	/** The total number of bytes (both data and event buffers) */
+	private long totalNumberOfBytes;
+
+	public ResultSubpartition(int index, ResultPartition parent) {
+		this.index = index;
+		this.parent = parent;
+	}
+
+	protected void updateStatistics(Buffer buffer) {
+		totalNumberOfBuffers++;
+		totalNumberOfBytes += buffer.getSize();
+	}
+
+	protected int getTotalNumberOfBuffers() {
+		return totalNumberOfBuffers;
+	}
+
+	protected long getTotalNumberOfBytes() {
+		return totalNumberOfBytes;
+	}
+
+	/**
+	 * Notifies the parent partition about a consumed {@link ResultSubpartitionView}.
+	 */
+	protected void onConsumedSubpartition() {
+		parent.onConsumedSubpartition(index);
+	}
+
+	abstract public boolean add(Buffer buffer) throws IOException;
+
+	abstract public void finish() throws IOException;
+
+	abstract public void release() throws IOException;
+
+	abstract public ResultSubpartitionView getReadView(Optional<BufferProvider> bufferProvider) throws IOException;
+
+	abstract int releaseMemory() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
new file mode 100644
index 0000000..82cee6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+
+/**
+ * A view to consume a {@link ResultSubpartition} instance.
+ */
+public interface ResultSubpartitionView {
+
+	/**
+	 * Returns the next {@link Buffer} instance of this queue iterator.
+	 * <p>
+	 * If there is currently no instance available, it will return <code>null</code>.
+	 * This might happen for example when a pipelined queue producer is slower
+	 * than the consumer or a spilled queue needs to read in more data.
+	 * <p>
+	 * <strong>Important</strong>: The consumer has to make sure that each
+	 * buffer instance will eventually be recycled with {@link Buffer#recycle()}
+	 * after it has been consumed.
+	 */
+	Buffer getNextBuffer() throws IOException, InterruptedException;
+
+	/**
+	 * Subscribes to data availability notifications.
+	 * <p>
+	 * Returns whether the subscription was successful. A subscription fails,
+	 * if there is data available.
+	 */
+	boolean registerListener(NotificationListener listener) throws IOException;
+
+
+	void releaseAllResources() throws IOException;
+
+	void notifySubpartitionConsumed() throws IOException;
+
+	boolean isReleased();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
new file mode 100644
index 0000000..da6a847
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import com.google.common.base.Optional;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A blocking in-memory subpartition, which is able to spill to disk.
+ *
+ * <p> Buffers are kept in-memory as long as possible. If not possible anymore, all buffers are
+ * spilled to disk.
+ */
+class SpillableSubpartition extends ResultSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class);
+
+	/** All buffers of this subpartition. */
+	final ArrayList<Buffer> buffers = new ArrayList<Buffer>();
+
+	/** The I/O manager to create the spill writer from. */
+	final IOManager ioManager;
+
+	/** The default I/O mode to use. */
+	final IOMode ioMode;
+
+	/** The writer used for spilling. As long as this is null, we are in-memory. */
+	BufferFileWriter spillWriter;
+
+	/** Flag indicating whether the subpartition has been finished. */
+	private boolean isFinished;
+
+	/** Flag indicating whether the subpartition has been released. */
+	private boolean isReleased;
+
+	/** The read view to consume this subpartition. */
+	private ResultSubpartitionView readView;
+
+	SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager, IOMode ioMode) {
+		super(index, parent);
+
+		this.ioManager = checkNotNull(ioManager);
+		this.ioMode = checkNotNull(ioMode);
+	}
+
+	@Override
+	public boolean add(Buffer buffer) throws IOException {
+		checkNotNull(buffer);
+
+		synchronized (buffers) {
+			if (isFinished || isReleased) {
+				return false;
+			}
+
+			// In-memory
+			if (spillWriter == null) {
+				buffers.add(buffer);
+
+				return true;
+			}
+		}
+
+		// Else: Spilling
+		spillWriter.writeBlock(buffer);
+
+		return true;
+	}
+
+	@Override
+	public void finish() throws IOException {
+		synchronized (buffers) {
+			if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) {
+				// If we are spilling/have spilled, wait for the writer to finish.
+				if (spillWriter != null) {
+					spillWriter.close();
+				}
+
+				isFinished = true;
+			}
+		}
+	}
+
+	@Override
+	public void release() throws IOException {
+		final ResultSubpartitionView view;
+
+		synchronized (buffers) {
+			if (isReleased) {
+				return;
+			}
+
+			// Recycle all in-memory buffers
+			for (Buffer buffer : buffers) {
+				buffer.recycle();
+			}
+
+			buffers.clear();
+			buffers.trimToSize();
+
+			// If we are spilling/have spilled, wait for the writer to finish and delete the file.
+			if (spillWriter != null) {
+				spillWriter.closeAndDelete();
+			}
+
+			// Get the view...
+			view = readView;
+			readView = null;
+
+			isReleased = true;
+		}
+
+		// Release the view outside of the synchronized block
+		if (view != null) {
+			view.notifySubpartitionConsumed();
+		}
+	}
+
+	@Override
+	public int releaseMemory() throws IOException {
+		synchronized (buffers) {
+			if (spillWriter == null) {
+				// Create the spill writer
+				spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
+
+				final int numberOfBuffers = buffers.size();
+
+				// Spill all buffers
+				for (int i = 0; i < numberOfBuffers; i++) {
+					spillWriter.writeBlock(buffers.remove(0));
+				}
+
+				LOG.debug("Spilling {} buffers of {}.", numberOfBuffers, this);
+
+				return numberOfBuffers;
+			}
+		}
+
+		// Else: We have already spilled and don't hold any buffers
+		return 0;
+	}
+
+	@Override
+	public ResultSubpartitionView getReadView(Optional<BufferProvider> bufferProvider) throws IOException {
+		synchronized (buffers) {
+			if (!isFinished) {
+				throw new IllegalStateException("Subpartition has not been finished yet, " +
+						"but blocking subpartitions can only be consumed after they have " +
+						"been finished.");
+			}
+
+			if (readView != null) {
+				throw new IllegalStateException("Subpartition is being or already has been " +
+						"consumed, but we currently allow subpartitions to only be consumed once.");
+			}
+
+			// Spilled if closed and no outstanding write requests
+			boolean isSpilled = spillWriter != null && (spillWriter.isClosed()
+					|| spillWriter.getNumberOfOutstandingRequests() == 0);
+
+			if (isSpilled) {
+				if (ioMode.isSynchronous()) {
+					readView = new SpilledSubpartitionViewSyncIO(
+							this,
+							bufferProvider.get().getMemorySegmentSize(),
+							spillWriter.getChannelID(),
+							0);
+				}
+				else {
+					readView = new SpilledSubpartitionViewAsyncIO(
+							this,
+							bufferProvider.get(),
+							ioManager,
+							spillWriter.getChannelID(),
+							0);
+				}
+			}
+			else {
+				readView = new SpillableSubpartitionView(
+						this, bufferProvider.get(), buffers.size(), ioMode);
+			}
+
+			return readView;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
+						"finished? %s, read view? %s, spilled? %s]",
+				getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
+				spillWriter != null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
new file mode 100644
index 0000000..59b1464
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+class SpillableSubpartitionView implements ResultSubpartitionView {
+
+	/** The subpartition this view belongs to. */
+	private final SpillableSubpartition parent;
+
+	/** The buffer provider to read buffers into (spilling case). */
+	private final BufferProvider bufferProvider;
+
+	/** The number of buffers in-memory at the subpartition. */
+	private final int numberOfBuffers;
+
+	/** The default I/O mode to use. */
+	private final IOMode ioMode;
+
+	private ResultSubpartitionView spilledView;
+
+	private int currentQueuePosition;
+
+	private long currentBytesRead;
+
+	private final AtomicBoolean isReleased = new AtomicBoolean(false);
+
+	public SpillableSubpartitionView(
+			SpillableSubpartition parent,
+			BufferProvider bufferProvider,
+			int numberOfBuffers,
+			IOMode ioMode) {
+
+		this.parent = checkNotNull(parent);
+		this.bufferProvider = checkNotNull(bufferProvider);
+		checkArgument(numberOfBuffers >= 0);
+		this.numberOfBuffers = numberOfBuffers;
+		this.ioMode = checkNotNull(ioMode);
+	}
+
+	@Override
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+
+		if (isReleased.get()) {
+			return null;
+		}
+
+		// 1) In-memory
+		synchronized (parent.buffers) {
+			if (parent.spillWriter == null) {
+				if (currentQueuePosition < numberOfBuffers) {
+					Buffer buffer = parent.buffers.get(currentQueuePosition);
+
+					buffer.retain();
+
+					// TODO Fix hard coding of 8 bytes for the header
+					currentBytesRead += buffer.getSize() + 8;
+					currentQueuePosition++;
+
+					return buffer;
+				}
+
+				return null;
+			}
+		}
+
+		// 2) Spilled
+		if (spilledView != null) {
+			return spilledView.getNextBuffer();
+		}
+
+		// 3) Spilling
+		// Make sure that all buffers are written before consuming them. We can't block here,
+		// because this might be called from an network I/O thread.
+		if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
+			return null;
+		}
+
+		if (ioMode.isSynchronous()) {
+			spilledView = new SpilledSubpartitionViewSyncIO(
+					parent,
+					bufferProvider.getMemorySegmentSize(),
+					parent.spillWriter.getChannelID(),
+					0);
+		}
+		else {
+			spilledView = new SpilledSubpartitionViewAsyncIO(
+					parent,
+					bufferProvider,
+					parent.ioManager,
+					parent.spillWriter.getChannelID(),
+					0);
+		}
+
+		return spilledView.getNextBuffer();
+	}
+
+	@Override
+	public boolean registerListener(NotificationListener listener) throws IOException {
+		if (spilledView == null) {
+			synchronized (parent.buffers) {
+				// Didn't spill yet, buffers should be in-memory
+				if (parent.spillWriter == null) {
+					return false;
+				}
+			}
+
+			// Spilling
+			if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
+				return parent.spillWriter.registerAllRequestsProcessedListener(listener);
+			}
+
+			return false;
+		}
+
+		return spilledView.registerListener(listener);
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		parent.onConsumedSubpartition();
+	}
+
+	@Override
+	public void releaseAllResources() throws IOException {
+		if (isReleased.compareAndSet(false, true)) {
+			if (spilledView != null) {
+				spilledView.releaseAllResources();
+			}
+		}
+	}
+
+	@Override
+	public boolean isReleased() {
+		return isReleased.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
new file mode 100644
index 0000000..1d4a9ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * View over a spilled subpartition.
+ *
+ * <p> Reads are triggered asynchronously in batches of configurable size.
+ */
+class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
+
+	private final static int DEFAULT_READ_BATCH_SIZE = 2;
+
+	private final Object lock = new Object();
+
+	/** The subpartition this view belongs to. */
+	private final ResultSubpartition parent;
+
+	/** The buffer provider to get the buffer read everything into. */
+	private final BufferProvider bufferProvider;
+
+	/** The buffer availability listener to be notified on available buffers. */
+	private final BufferProviderCallback bufferAvailabilityListener;
+
+	/** The size of read batches. */
+	private final int readBatchSize;
+
+	/**
+	 * The size of the current batch (>= 0 and <= the configured batch size). Reads are only
+	 * triggered when the size of the current batch is 0.
+	 */
+	private final AtomicInteger currentBatchSize = new AtomicInteger();
+
+	/** The asynchronous file reader to do the actual I/O. */
+	private final BufferFileReader asyncFileReader;
+
+	/** The buffers, which have been returned from the file reader. */
+	private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue<Buffer>();
+
+	/** A data availability listener. */
+	private NotificationListener registeredListener;
+
+	/** Error, which has occurred in the I/O thread. */
+	private volatile IOException errorInIOThread;
+
+	/** Flag indicating whether all resources have been released. */
+	private volatile boolean isReleased;
+
+	/** Flag indicating whether we reached EOF at the file reader. */
+	private volatile boolean hasReachedEndOfFile;
+
+	SpilledSubpartitionViewAsyncIO(
+			ResultSubpartition parent,
+			BufferProvider bufferProvider,
+			IOManager ioManager,
+			FileIOChannel.ID channelId,
+			long initialSeekPosition) throws IOException {
+
+		this(parent, bufferProvider, ioManager, channelId, initialSeekPosition, DEFAULT_READ_BATCH_SIZE);
+	}
+
+	SpilledSubpartitionViewAsyncIO(
+			ResultSubpartition parent,
+			BufferProvider bufferProvider,
+			IOManager ioManager,
+			FileIOChannel.ID channelId,
+			long initialSeekPosition,
+			int readBatchSize) throws IOException {
+
+		checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0.");
+		checkArgument(readBatchSize >= 1, "Batch read size < 1.");
+
+		this.parent = checkNotNull(parent);
+		this.bufferProvider = checkNotNull(bufferProvider);
+		this.bufferAvailabilityListener = new BufferProviderCallback(this);
+
+		this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
+
+		if (initialSeekPosition > 0) {
+			asyncFileReader.seekToPosition(initialSeekPosition);
+		}
+
+		this.readBatchSize = readBatchSize;
+
+		// Trigger the initial read requests
+		readNextBatchAsync();
+	}
+
+	@Override
+	public Buffer getNextBuffer() throws IOException {
+		checkError();
+
+		final Buffer buffer = returnedBuffers.poll();
+
+		// No buffer returned from the I/O thread currently. Either the current batch is in progress
+		// or we trigger the next one.
+		if (buffer == null) {
+			if (currentBatchSize.get() == 0) {
+				readNextBatchAsync();
+			}
+		}
+		else {
+			currentBatchSize.decrementAndGet();
+		}
+
+		return buffer;
+	}
+
+	@Override
+	public boolean registerListener(NotificationListener listener) throws IOException {
+		checkNotNull(listener);
+
+		checkError();
+
+		synchronized (lock) {
+			if (isReleased || !returnedBuffers.isEmpty()) {
+				return false;
+			}
+
+			if (registeredListener == null) {
+				registeredListener = listener;
+
+				return true;
+			}
+		}
+
+		throw new IllegalStateException("Already registered listener.");
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		parent.onConsumedSubpartition();
+	}
+
+	@Override
+	public void releaseAllResources() throws IOException {
+		try {
+			synchronized (lock) {
+				if (!isReleased) {
+					// Recycle all buffers. Buffers, which are in flight are recycled as soon as
+					// they return from the I/O thread.
+					Buffer buffer;
+					while ((buffer = returnedBuffers.poll()) != null) {
+						buffer.recycle();
+					}
+
+					isReleased = true;
+				}
+			}
+		}
+		finally {
+			asyncFileReader.close();
+		}
+	}
+
+	@Override
+	public boolean isReleased() {
+		return isReleased;
+	}
+
+	/**
+	 * Requests buffers from the buffer provider and triggers asynchronous read requests to fill
+	 * them.
+	 *
+	 * <p> The number of requested buffers/triggered I/O read requests per call depends on the
+	 * configured size of batch reads.
+	 */
+	private void readNextBatchAsync() throws IOException {
+		// This does not need to be fully synchronized with actually reaching EOF as long as
+		// we eventually notice it. In the worst case, we trigger some discarded reads and
+		// notice it when the buffers are returned.
+		//
+		// We only trigger reads if the current batch size is 0.
+		if (hasReachedEndOfFile || currentBatchSize.get() != 0) {
+			return;
+		}
+
+		// Number of successful buffer requests or callback registrations. The call back will
+		// trigger the read as soon as a buffer becomes available again.
+		int i = 0;
+
+		while (i < readBatchSize) {
+			final Buffer buffer = bufferProvider.requestBuffer();
+
+			if (buffer == null) {
+				// Listen for buffer availability.
+				currentBatchSize.incrementAndGet();
+
+				if (bufferProvider.addListener(bufferAvailabilityListener)) {
+					i++;
+				}
+				else if (bufferProvider.isDestroyed()) {
+					currentBatchSize.decrementAndGet();
+					return;
+				}
+				else {
+					// Buffer available again
+					currentBatchSize.decrementAndGet();
+				}
+			}
+			else {
+				currentBatchSize.incrementAndGet();
+
+				asyncFileReader.readInto(buffer);
+			}
+		}
+	}
+
+	/**
+	 * Returns a buffer from the buffer provider.
+	 *
+	 * <p> Note: This method is called from the thread recycling the available buffer.
+	 */
+	private void onAvailableBuffer(Buffer buffer) {
+		try {
+			asyncFileReader.readInto(buffer);
+		}
+		catch (IOException e) {
+			notifyError(e);
+		}
+	}
+
+	/**
+	 * Returns a successful buffer read request.
+	 *
+	 * <p> Note: This method is always called from the same I/O thread.
+	 */
+	private void returnBufferFromIOThread(Buffer buffer) {
+		final NotificationListener listener;
+
+		synchronized (lock) {
+			if (hasReachedEndOfFile || isReleased) {
+				buffer.recycle();
+
+				return;
+			}
+
+			returnedBuffers.add(buffer);
+
+			listener = registeredListener;
+			registeredListener = null;
+
+			// If this was the last buffer before we reached EOF, set the corresponding flag to
+			// ensure that further buffers are correctly recycled and eventually no further reads
+			// are triggered.
+			if (asyncFileReader.hasReachedEndOfFile()) {
+				hasReachedEndOfFile = true;
+			}
+		}
+
+		if (listener != null) {
+			listener.onNotification();
+		}
+	}
+
+	/**
+	 * Notifies the view about an error.
+	 */
+	private void notifyError(IOException error) {
+		if (errorInIOThread == null) {
+			errorInIOThread = error;
+		}
+
+		final NotificationListener listener;
+
+		synchronized (lock) {
+			listener = registeredListener;
+			registeredListener = null;
+		}
+
+		if (listener != null) {
+			listener.onNotification();
+		}
+	}
+
+	/**
+	 * Checks whether an error has been reported and rethrow the respective Exception, if available.
+	 */
+	private void checkError() throws IOException {
+		if (errorInIOThread != null) {
+			throw errorInIOThread;
+		}
+	}
+
+	/**
+	 * Callback from the I/O thread.
+	 *
+	 * <p> Successful buffer read requests add the buffer to the subpartition view, and failed ones
+	 * notify about the error.
+	 */
+	private static class IOThreadCallback implements RequestDoneCallback<Buffer> {
+
+		private final SpilledSubpartitionViewAsyncIO subpartitionView;
+
+		public IOThreadCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
+			this.subpartitionView = subpartitionView;
+		}
+
+		@Override
+		public void requestSuccessful(Buffer buffer) {
+			subpartitionView.returnBufferFromIOThread(buffer);
+		}
+
+		@Override
+		public void requestFailed(Buffer buffer, IOException error) {
+			// Recycle the buffer and forward the error
+			buffer.recycle();
+
+			subpartitionView.notifyError(error);
+		}
+	}
+
+	/**
+	 * Callback from the buffer provider.
+	 */
+	private static class BufferProviderCallback implements EventListener<Buffer> {
+
+		private final SpilledSubpartitionViewAsyncIO subpartitionView;
+
+		private BufferProviderCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
+			this.subpartitionView = subpartitionView;
+		}
+
+		@Override
+		public void onEvent(Buffer buffer) {
+			if (buffer == null) {
+				return;
+			}
+
+			subpartitionView.onAvailableBuffer(buffer);
+		}
+	}
+}


Mime
View raw message