flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [07/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant
Date Wed, 18 Mar 2015 16:48:56 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
new file mode 100644
index 0000000..62ebddf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * View over a spilled subpartition.
+ *
+ * <p> Reads are done synchronously.
+ */
+class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
+
+	/** The subpartition this view belongs to. */
+	private final ResultSubpartition parent;
+
+	/** The synchronous file reader to do the actual I/O. */
+	private final BufferFileReader fileReader;
+
+	/** The buffer pool to read data into. */
+	private final SpillReadBufferPool bufferPool;
+
+	/** Flag indicating whether all resources have been released. */
+	private AtomicBoolean isReleased = new AtomicBoolean();
+
+	SpilledSubpartitionViewSyncIO(
+			ResultSubpartition parent,
+			int memorySegmentSize,
+			FileIOChannel.ID channelId,
+			long initialSeekPosition) throws IOException {
+
+		checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0.");
+
+		this.parent = checkNotNull(parent);
+
+		this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
+
+		this.fileReader = new SynchronousBufferFileReader(channelId, false);
+
+		if (initialSeekPosition > 0) {
+			fileReader.seekToPosition(initialSeekPosition);
+		}
+	}
+
+	@Override
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+
+		if (fileReader.hasReachedEndOfFile()) {
+			return null;
+		}
+
+		// It's OK to request the buffer in a blocking fashion as the buffer pool is NOT shared
+		// among all consumed subpartitions.
+		final Buffer buffer = bufferPool.requestBufferBlocking();
+
+		fileReader.readInto(buffer);
+
+		return buffer;
+	}
+
+	@Override
+	public boolean registerListener(NotificationListener listener) throws IOException {
+		return false;
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		parent.onConsumedSubpartition();
+	}
+
+	@Override
+	public void releaseAllResources() throws IOException {
+		if (isReleased.compareAndSet(false, true)) {
+			fileReader.close();
+			bufferPool.destroy();
+		}
+	}
+
+	@Override
+	public boolean isReleased() {
+		return isReleased.get();
+	}
+
+	/**
+	 * A buffer pool to provide buffer to read the file into.
+	 *
+	 * <p> This pool ensures that a consuming input gate makes progress in all cases, even when all
+	 * buffers of the input gate buffer pool have been requested by remote input channels.
+	 *
+	 * TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
+	 * consumed subpartition.
+	 */
+	private static class SpillReadBufferPool implements BufferRecycler {
+
+		private final Queue<Buffer> buffers;
+
+		private boolean isDestroyed;
+
+		public SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
+			this.buffers = new ArrayDeque<Buffer>(numberOfBuffers);
+
+			synchronized (buffers) {
+				for (int i = 0; i < numberOfBuffers; i++) {
+					buffers.add(new Buffer(new MemorySegment(new byte[memorySegmentSize]), this));
+				}
+			}
+		}
+
+		@Override
+		public void recycle(MemorySegment memorySegment) {
+			synchronized (buffers) {
+				if (isDestroyed) {
+					memorySegment.free();
+				}
+				else {
+					buffers.add(new Buffer(memorySegment, this));
+					buffers.notifyAll();
+				}
+			}
+		}
+
+		private Buffer requestBufferBlocking() throws InterruptedException {
+			synchronized (buffers) {
+				while (true) {
+					if (isDestroyed) {
+						return null;
+					}
+
+					Buffer buffer = buffers.poll();
+
+					if (buffer != null) {
+						return buffer;
+					}
+					// Else: wait for a buffer
+					buffers.wait();
+				}
+			}
+		}
+
+		private void destroy() {
+			synchronized (buffers) {
+				isDestroyed = true;
+				buffers.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index fb41549..855509c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -19,20 +19,19 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
 import java.io.IOException;
 
 /**
- * An input channel is the consumer of a single subpartition of an {@link IntermediateResultPartitionQueue}.
+ * An input channel consumes a single {@link ResultSubpartitionView}.
  * <p>
  * For each channel, the consumption life cycle is as follows:
  * <ol>
- * <li>{@link #requestIntermediateResultPartition(int)}</li>
- * <li>{@link #getNextBuffer()} until {@link #isReleased()}</li>
+ * <li>{@link #requestSubpartition(int)}</li>
+ * <li>{@link #getNextBuffer()}</li>
  * <li>{@link #releaseAllResources()}</li>
  * </ol>
  */
@@ -40,17 +39,13 @@ public abstract class InputChannel {
 
 	protected final int channelIndex;
 
-	protected final ExecutionAttemptID producerExecutionId;
-
-	protected final IntermediateResultPartitionID partitionId;
+	protected final ResultPartitionID partitionId;
 
 	protected final SingleInputGate inputGate;
 
-	protected InputChannel(SingleInputGate inputGate, int channelIndex, ExecutionAttemptID producerExecutionId,
-			IntermediateResultPartitionID partitionId) {
+	protected InputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId) {
 		this.inputGate = inputGate;
 		this.channelIndex = channelIndex;
-		this.producerExecutionId = producerExecutionId;
 		this.partitionId = partitionId;
 	}
 
@@ -62,19 +57,6 @@ public abstract class InputChannel {
 		return channelIndex;
 	}
 
-	public ExecutionAttemptID getProducerExecutionId() {
-		return producerExecutionId;
-	}
-
-	public IntermediateResultPartitionID getPartitionId() {
-		return partitionId;
-	}
-
-	@Override
-	public String toString() {
-		return String.format("[%s:%s]", producerExecutionId, partitionId);
-	}
-
 	/**
 	 * Notifies the owning {@link SingleInputGate} about an available {@link Buffer} instance.
 	 */
@@ -93,12 +75,12 @@ public abstract class InputChannel {
 	 * The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
-	public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException;
+	public abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException;
 
 	/**
 	 * Returns the next buffer from the consumed subpartition.
 	 */
-	public abstract Buffer getNextBuffer() throws IOException;
+	public abstract Buffer getNextBuffer() throws IOException, InterruptedException;
 
 	// ------------------------------------------------------------------------
 	// Task events
@@ -120,6 +102,8 @@ public abstract class InputChannel {
 
 	public abstract boolean isReleased();
 
+	public abstract void notifySubpartitionConsumed() throws IOException;
+
 	/**
 	 * Releases all resources of the channel.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index c05952f..65f2627 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.base.Optional;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,24 +43,24 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
 
-	private final IntermediateResultPartitionManager partitionManager;
+	private final ResultPartitionManager partitionManager;
 
 	private final TaskEventDispatcher taskEventDispatcher;
 
-	private IntermediateResultPartitionQueueIterator queueIterator;
+	private ResultSubpartitionView queueIterator;
 
-	private boolean isReleased;
+	private volatile boolean isReleased;
 
 	private volatile Buffer lookAhead;
 
 	public LocalInputChannel(
-			SingleInputGate gate, int channelIndex,
-			ExecutionAttemptID producerExecutionId,
-			IntermediateResultPartitionID partitionId,
-			IntermediateResultPartitionManager partitionManager,
+			SingleInputGate gate,
+			int channelIndex,
+			ResultPartitionID partitionId,
+			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher) {
 
-		super(gate, channelIndex, producerExecutionId, partitionId);
+		super(gate, channelIndex, partitionId);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -70,20 +71,23 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void requestIntermediateResultPartition(int queueIndex) throws IOException {
+	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		if (queueIterator == null) {
-			LOG.debug("Requesting LOCAL queue {} from partition {} produced by {}.", queueIndex, partitionId,
-						producerExecutionId);
+			LOG.debug("Requesting LOCAL queue {} of partition {}.", subpartitionIndex, partitionId);
 
-			queueIterator = partitionManager.getIntermediateResultPartitionIterator(
-					producerExecutionId, partitionId, queueIndex, Optional.of(inputGate.getBufferProvider()));
+			queueIterator = partitionManager
+					.getSubpartition(partitionId, subpartitionIndex, Optional.of(inputGate.getBufferProvider()));
+
+			if (queueIterator == null) {
+				throw new IOException("Error requesting sub partition.");
+			}
 
 			getNextLookAhead();
 		}
 	}
 
 	@Override
-	public Buffer getNextBuffer() throws IOException {
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
 		checkState(queueIterator != null, "Queried for a buffer before requesting a queue.");
 
 		// After subscribe notification
@@ -94,6 +98,13 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 		Buffer next = lookAhead;
 		lookAhead = null;
 
+		if (!next.isBuffer() && EventSerializer
+				.fromBuffer(next, getClass().getClassLoader())
+				.getClass() == EndOfPartitionEvent.class) {
+
+				return next;
+		}
+
 		getNextLookAhead();
 
 		return next;
@@ -107,7 +118,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	public void sendTaskEvent(TaskEvent event) throws IOException {
 		checkState(queueIterator != null, "Tried to send task event to producer before requesting a queue.");
 
-		if (!taskEventDispatcher.publish(producerExecutionId, partitionId, event)) {
+		if (!taskEventDispatcher.publish(partitionId, event)) {
 			throw new IOException("Error while publishing event " + event + " to producer. The producer could not be found.");
 		}
 	}
@@ -121,6 +132,13 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 		return isReleased;
 	}
 
+	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		if (queueIterator != null) {
+			queueIterator.notifySubpartitionConsumed();
+		}
+	}
+
 	/**
 	 * Releases the look ahead {@link Buffer} instance and discards the queue
 	 * iterator.
@@ -134,7 +152,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			}
 
 			if (queueIterator != null) {
-				queueIterator.discard();
+				queueIterator.releaseAllResources();
 				queueIterator = null;
 			}
 
@@ -144,7 +162,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	public String toString() {
-		return "LOCAL " + super.toString();
+		return "LocalInputChannel [" + partitionId + "]";
 	}
 
 	// ------------------------------------------------------------------------
@@ -153,12 +171,21 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	public void onNotification() {
-		notifyAvailableBuffer();
+		if (isReleased) {
+			return;
+		}
+
+		try {
+			getNextLookAhead();
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e);
+		}
 	}
 
 	// ------------------------------------------------------------------------
 
-	private void getNextLookAhead() throws IOException {
+	private void getNextLookAhead() throws IOException, InterruptedException {
 		while (true) {
 			lookAhead = queueIterator.getNextBuffer();
 
@@ -167,7 +194,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 				break;
 			}
 
-			if (queueIterator.subscribe(this) || queueIterator.isConsumed()) {
+			if (queueIterator.registerListener(this) || queueIterator.isReleased()) {
 				return;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index d50ddc2..2ca2ff7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -19,13 +19,12 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteAddress;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +46,7 @@ public class RemoteInputChannel extends InputChannel {
 
 	private final InputChannelID id;
 
-	private final RemoteAddress producerAddress;
+	private final ConnectionID producerAddress;
 
 	private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
 
@@ -64,12 +63,11 @@ public class RemoteInputChannel extends InputChannel {
 	public RemoteInputChannel(
 			SingleInputGate gate,
 			int channelIndex,
-			ExecutionAttemptID producerExecutionId,
-			IntermediateResultPartitionID partitionId,
-			RemoteAddress producerAddress,
+			ResultPartitionID partitionId,
+			ConnectionID producerAddress,
 			ConnectionManager connectionManager) {
 
-		super(gate, channelIndex, producerExecutionId, partitionId);
+		super(gate, channelIndex, partitionId);
 
 		/**
 		 * This ID is used by the {@link PartitionRequestClient} to distinguish
@@ -85,13 +83,13 @@ public class RemoteInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException {
+	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		if (partitionRequestClient == null) {
-			LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex, partitionId, producerExecutionId);
+			LOG.debug("Requesting REMOTE queue {} from of partition {}.", subpartitionIndex, partitionId);
 
 			partitionRequestClient = connectionManager.createPartitionRequestClient(producerAddress);
 
-			partitionRequestClient.requestIntermediateResultPartition(producerExecutionId, partitionId, queueIndex, this);
+			partitionRequestClient.requestIntermediateResultPartition(partitionId, subpartitionIndex, this);
 		}
 	}
 
@@ -125,7 +123,7 @@ public class RemoteInputChannel extends InputChannel {
 
 		checkIoError();
 
-		partitionRequestClient.sendTaskEvent(producerExecutionId, partitionId, event, this);
+		partitionRequestClient.sendTaskEvent(partitionId, event, this);
 	}
 
 	// ------------------------------------------------------------------------
@@ -137,6 +135,11 @@ public class RemoteInputChannel extends InputChannel {
 		return isReleased.get();
 	}
 
+	@Override
+	public void notifySubpartitionConsumed() {
+		// Nothing to do
+	}
+
 	/**
 	 * Releases all received buffers and closes the partition request client.
 	 */
@@ -160,7 +163,7 @@ public class RemoteInputChannel extends InputChannel {
 
 	@Override
 	public String toString() {
-		return "REMOTE " + id + " " + producerAddress + " " + super.toString();
+		return "RemoteInputChannel [" + partitionId + " at " + producerAddress + "]";
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d981451..5b97d26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -19,18 +19,19 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.RemoteAddress;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.util.event.EventListener;
@@ -39,11 +40,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -51,20 +54,23 @@ import static com.google.common.base.Preconditions.checkState;
 
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
- * <p>
- * Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ *
+ * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
- * <p>
- * As an example, consider a map-reduce program, where the map operator produces data and the reduce
- * operator consumes the produced data.
+ *
+ * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * reduce operator consumes the produced data.
+ *
  * <pre>
  * +-----+              +---------------------+              +--------+
  * | Map | = produce => | Intermediate Result | <= consume = | Reduce |
  * +-----+              +---------------------+              +--------+
  * </pre>
- * When deploying such a program in parallel, the intermediate result will be partitioned over its
+ *
+ * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
  * subpartitions.
+ *
  * <pre>
  *                            Intermediate result
  *               +-----------------------------------------+
@@ -81,10 +87,10 @@ import static com.google.common.base.Preconditions.checkState;
  *               |                      +----------------+ |              +-----------------------+
  *               +-----------------------------------------+
  * </pre>
- * In the above example, two map subtasks produce the intermediate result in parallel, resulting
- * in two partitions (Partition 1 and 2). Each of these partitions is subpartitioned into two
+ *
+ * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
  * subpartitions -- one for each parallel reduce subtask.
- * <p>
  */
 public class SingleInputGate implements InputGate {
 
@@ -102,7 +108,7 @@ public class SingleInputGate implements InputGate {
 
 	/**
 	 * The index of the consumed subpartition of each consumed partition. This index depends on the
-	 * distribution pattern and both subtask indices of the producing and consuming task.
+	 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
 	 */
 	private final int consumedSubpartitionIndex;
 
@@ -128,7 +134,7 @@ public class SingleInputGate implements InputGate {
 	private boolean requestedPartitionsFlag;
 
 	/** Flag indicating whether all resources have been released. */
-	private boolean releasedResourcesFlag;
+	private volatile boolean isReleased;
 
 	/** Registered listener to forward buffer notifications to. */
 	private final List<EventListener<InputGate>> registeredListeners = new CopyOnWriteArrayList<EventListener<InputGate>>();
@@ -185,42 +191,48 @@ public class SingleInputGate implements InputGate {
 
 	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
 		synchronized (requestLock) {
-			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null &&
-					inputChannel.getClass() == UnknownInputChannel.class) {
+			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null
+					&& inputChannel.getClass() == UnknownInputChannel.class) {
+
 				numberOfUninitializedChannels++;
 			}
 		}
 	}
 
-	public void updateInputChannel(PartitionInfo partitionInfo) throws IOException, InterruptedException {
+	public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOException, InterruptedException {
 		synchronized (requestLock) {
-			if (releasedResourcesFlag) {
+			if (isReleased) {
 				// There was a race with a task failure/cancel
 				return;
 			}
 
-			final IntermediateResultPartitionID partitionId = partitionInfo.getPartitionId();
+			final IntermediateResultPartitionID partitionId = icdd.getConsumedPartitionId().getPartitionId();
 
 			InputChannel current = inputChannels.get(partitionId);
 
 			if (current.getClass() == UnknownInputChannel.class) {
+
 				UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
 
 				InputChannel newChannel;
 
-				if (partitionInfo.getProducerLocation() == PartitionInfo.PartitionLocation.REMOTE) {
-					newChannel = unknownChannel.toRemoteInputChannel(partitionInfo.getProducerAddress());
-				}
-				else if (partitionInfo.getProducerLocation() == PartitionInfo.PartitionLocation.LOCAL) {
+				ResultPartitionLocation partitionLocation = icdd.getConsumedPartitionLocation();
+
+				if (partitionLocation.isLocal()) {
 					newChannel = unknownChannel.toLocalInputChannel();
 				}
+				else if (partitionLocation.isRemote()) {
+					newChannel = unknownChannel.toRemoteInputChannel(partitionLocation.getConnectionId());
+				}
 				else {
 					throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
 				}
 
+				LOG.debug("Updated unknown input channel to {}.", newChannel);
+
 				inputChannels.put(partitionId, newChannel);
 
-				newChannel.requestIntermediateResultPartition(consumedSubpartitionIndex);
+				newChannel.requestSubpartition(consumedSubpartitionIndex);
 
 				for (TaskEvent event : pendingEvents) {
 					newChannel.sendTaskEvent(event);
@@ -235,7 +247,7 @@ public class SingleInputGate implements InputGate {
 
 	public void releaseAllResources() throws IOException {
 		synchronized (requestLock) {
-			if (!releasedResourcesFlag) {
+			if (!isReleased) {
 				try {
 					for (InputChannel inputChannel : inputChannels.values()) {
 						try {
@@ -249,11 +261,11 @@ public class SingleInputGate implements InputGate {
 					// The buffer pool can actually be destroyed immediately after the
 					// reader received all of the data from the input channels.
 					if (bufferPool != null) {
-						bufferPool.destroy();
+						bufferPool.lazyDestroy();
 					}
 				}
 				finally {
-					releasedResourcesFlag = true;
+					isReleased = true;
 				}
 			}
 		}
@@ -284,7 +296,7 @@ public class SingleInputGate implements InputGate {
 
 			synchronized (requestLock) {
 				for (InputChannel inputChannel : inputChannels.values()) {
-					inputChannel.requestIntermediateResultPartition(consumedSubpartitionIndex);
+					inputChannel.requestSubpartition(consumedSubpartitionIndex);
 				}
 			}
 
@@ -299,13 +311,16 @@ public class SingleInputGate implements InputGate {
 	@Override
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
 
-		if (releasedResourcesFlag) {
+		if (isReleased) {
 			throw new IllegalStateException("The input has already been consumed. This indicates misuse of the input gate.");
 		}
 
 		requestPartitions();
 
-		final InputChannel currentChannel = inputChannelsWithData.take();
+		InputChannel currentChannel = null;
+		while (currentChannel == null) {
+			currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
+		}
 
 		final Buffer buffer = currentChannel.getNextBuffer();
 
@@ -322,6 +337,8 @@ public class SingleInputGate implements InputGate {
 			final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
 
 			if (event.getClass() == EndOfPartitionEvent.class) {
+				currentChannel.notifySubpartitionConsumed();
+
 				currentChannel.releaseAllResources();
 			}
 
@@ -354,58 +371,63 @@ public class SingleInputGate implements InputGate {
 	public void onAvailableBuffer(InputChannel channel) {
 		inputChannelsWithData.add(channel);
 
-		for (int i = 0; i < registeredListeners.size(); i++) {
-			registeredListeners.get(i).onEvent(this);
+		for (EventListener<InputGate> registeredListener : registeredListeners) {
+			registeredListener.onEvent(this);
 		}
 	}
 
 	// ------------------------------------------------------------------------
 
-	public static SingleInputGate create(NetworkEnvironment networkEnvironment, PartitionConsumerDeploymentDescriptor desc) {
-		// The consumed intermediate data set (all partitions are part of this data set)
-		final IntermediateDataSetID resultId = desc.getResultId();
-		// The queue to request from each consumed partition
-		final int queueIndex = desc.getQueueIndex();
-		// There is one input channel for each consumed partition
-		final PartitionInfo[] partitions = desc.getPartitions();
-		final int numberOfInputChannels = partitions.length;
-		final SingleInputGate reader = new SingleInputGate(resultId, queueIndex, numberOfInputChannels);
-		// Create input channels
-		final InputChannel[] inputChannels = new InputChannel[numberOfInputChannels];
-		int channelIndex = 0;
-		for (PartitionInfo partition : partitions) {
-			final ExecutionAttemptID producerExecutionId = partition.getProducerExecutionId();
-			final IntermediateResultPartitionID partitionId = partition.getPartitionId();
-			final PartitionInfo.PartitionLocation producerLocation = partition.getProducerLocation();
-			switch (producerLocation) {
-				case LOCAL:
-					LOG.debug("Create LocalInputChannel for {}.", partition);
-
-					inputChannels[channelIndex] = new LocalInputChannel(reader, channelIndex, producerExecutionId, partitionId,
-							networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher());
-
-					break;
-				case REMOTE:
-					LOG.debug("Create RemoteInputChannel for {}.", partition);
-
-					final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(),
-							"Missing producer address for remote intermediate result partition.");
-
-					inputChannels[channelIndex] = new RemoteInputChannel(reader, channelIndex, producerExecutionId, partitionId,
-							producerAddress, networkEnvironment.getConnectionManager());
-
-					break;
-				case UNKNOWN:
-					LOG.debug("Create UnknownInputChannel for {}.", partition);
-
-					inputChannels[channelIndex] = new UnknownInputChannel(reader, channelIndex, producerExecutionId, partitionId,
-							networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager());
-
-					break;
+	/**
+	 * Creates an input gate and all of its input channels.
+	 */
+	public static SingleInputGate create(
+			InputGateDeploymentDescriptor igdd,
+			NetworkEnvironment networkEnvironment) {
+
+		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
+
+		final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
+		checkArgument(consumedSubpartitionIndex >= 0);
+
+		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+
+		final SingleInputGate inputGate = new SingleInputGate(
+				consumedResultId, consumedSubpartitionIndex, icdd.length);
+
+		// Create the input channels. There is one input channel for each consumed partition.
+		final InputChannel[] inputChannels = new InputChannel[icdd.length];
+
+		for (int i = 0; i < inputChannels.length; i++) {
+
+			final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
+			final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
+
+			if (partitionLocation.isLocal()) {
+				inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
+						networkEnvironment.getPartitionManager(),
+						networkEnvironment.getTaskEventDispatcher());
+			}
+			else if (partitionLocation.isRemote()) {
+				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
+						partitionLocation.getConnectionId(),
+						networkEnvironment.getConnectionManager());
+			}
+			else if (partitionLocation.isUnknown()) {
+				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
+						networkEnvironment.getPartitionManager(),
+						networkEnvironment.getTaskEventDispatcher(),
+						networkEnvironment.getConnectionManager());
+			}
+			else {
+				throw new IllegalStateException("Unexpected partition location.");
 			}
-			reader.setInputChannel(partitionId, inputChannels[channelIndex]);
-			channelIndex++;
+
+			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
-		return reader;
+
+		LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+
+		return inputGate;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index f8e42ba..4bde292 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteAddress;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 
 import java.io.IOException;
 
@@ -38,21 +37,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 public class UnknownInputChannel extends InputChannel {
 
-	private final IntermediateResultPartitionManager partitionManager;
+	private final ResultPartitionManager partitionManager;
 
 	private final TaskEventDispatcher taskEventDispatcher;
 
 	private final ConnectionManager connectionManager;
 
 	public UnknownInputChannel(
-			SingleInputGate gate, int channelIndex,
-			ExecutionAttemptID producerExecutionId,
-			IntermediateResultPartitionID partitionId,
-			IntermediateResultPartitionManager partitionManager,
+			SingleInputGate gate,
+			int channelIndex,
+			ResultPartitionID partitionId,
+			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
 			ConnectionManager connectionManager) {
 
-		super(gate, channelIndex, producerExecutionId, partitionId);
+		super(gate, channelIndex, partitionId);
 
 		this.partitionManager = partitionManager;
 		this.taskEventDispatcher = taskEventDispatcher;
@@ -60,7 +59,7 @@ public class UnknownInputChannel extends InputChannel {
 	}
 
 	@Override
-	public void requestIntermediateResultPartition(int queueIndex) throws IOException {
+	public void requestSubpartition(int subpartitionIndex) throws IOException {
 		// Nothing to do here
 	}
 
@@ -89,24 +88,28 @@ public class UnknownInputChannel extends InputChannel {
 	}
 
 	@Override
+	public void notifySubpartitionConsumed() {
+	}
+
+	@Override
 	public void releaseAllResources() throws IOException {
 		// Nothing to do here
 	}
 
 	@Override
 	public String toString() {
-		return "UNKNOWN " + super.toString();
+		return "UnknownInputChannel [" + partitionId + "]";
 	}
 
 	// ------------------------------------------------------------------------
 	// Graduation to a local or remote input channel at runtime
 	// ------------------------------------------------------------------------
 
-	public RemoteInputChannel toRemoteInputChannel(RemoteAddress producerAddress) {
-		return new RemoteInputChannel(inputGate, channelIndex, producerExecutionId, partitionId, checkNotNull(producerAddress), connectionManager);
+	public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) {
+		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager);
 	}
 
 	public LocalInputChannel toLocalInputChannel() {
-		return new LocalInputChannel(inputGate, channelIndex, producerExecutionId, partitionId, partitionManager, taskEventDispatcher);
+		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IllegalQueueIteratorRequestException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IllegalQueueIteratorRequestException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IllegalQueueIteratorRequestException.java
deleted file mode 100644
index 80284c4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IllegalQueueIteratorRequestException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.queue;
-
-import java.io.IOException;
-
-public class IllegalQueueIteratorRequestException extends IOException {
-
-	private static final long serialVersionUID = 8381253563445306324L;
-
-	public IllegalQueueIteratorRequestException() {
-	}
-
-	public IllegalQueueIteratorRequestException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueue.java
deleted file mode 100644
index 278e2a1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueue.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.queue;
-
-import com.google.common.base.Optional;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-
-import java.io.IOException;
-
-public interface IntermediateResultPartitionQueue {
-
-	// ------------------------------------------------------------------------
-	// Produce
-	// ------------------------------------------------------------------------
-
-	void add(Buffer buffer) throws IOException;
-
-	void finish() throws IOException;
-
-	void discard() throws IOException;
-
-	// ------------------------------------------------------------------------
-	// Consume
-	// ------------------------------------------------------------------------
-
-	IntermediateResultPartitionQueueIterator getQueueIterator(Optional<BufferProvider> bufferProvider) throws IllegalQueueIteratorRequestException, IOException;
-
-	// ------------------------------------------------------------------------
-	// Properties
-	// ------------------------------------------------------------------------
-
-	boolean isFinished();
-
-	int recycleBuffers() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueueIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueueIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueueIterator.java
deleted file mode 100644
index 9fbb331..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/IntermediateResultPartitionQueueIterator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.queue;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-
-public interface IntermediateResultPartitionQueueIterator {
-
-	// ------------------------------------------------------------------------
-	// Properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns whether this iterator has been fully consumed, e.g. no more data
-	 * or queue has been discarded.
-	 */
-	boolean isConsumed();
-
-	// ------------------------------------------------------------------------
-	// Consume
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns the next {@link Buffer} instance of this queue iterator.
-	 * <p>
-	 * If there is currently no instance available, it will return <code>null</code>.
-	 * This might happen for example when a pipelined queue producer is slower
-	 * than the consumer or a spilled queue needs to read in more data.
-	 * <p>
-	 * <strong>Important</strong>: The consumer has to make sure that each
-	 * buffer instance will eventually be recycled with {@link Buffer#recycle()}
-	 * after it has been consumed.
-	 */
-	Buffer getNextBuffer() throws IOException;
-
-	/**
-	 * Discards the consumption of this queue iterator.
-	 */
-	void discard() throws IOException;
-
-	/**
-	 * Subscribes to data availability notifications.
-	 * <p>
-	 * Returns whether the subscription was successful. A subscription fails,
-	 * if there is data available.
-	 */
-	boolean subscribe(NotificationListener listener) throws AlreadySubscribedException;
-
-	// ------------------------------------------------------------------------
-
-	public class AlreadySubscribedException extends IOException {
-
-		private static final long serialVersionUID = -5583394817361970668L;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java
deleted file mode 100644
index 5d562e4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.queue;
-
-import com.google.common.base.Optional;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-/**
- * An in-memory only queue, which can be consumed once by a single consumer.
- */
-public class PipelinedPartitionQueue implements IntermediateResultPartitionQueue {
-
-	final Queue<Buffer> queue = new ArrayDeque<Buffer>();
-
-	boolean hasFinishedProduce;
-
-	private NotificationListener listener;
-
-	private boolean hasBeenDiscarded;
-
-	private boolean hasConsumer;
-
-	@Override
-	public void add(Buffer buffer) {
-		synchronized (queue) {
-			if (!hasFinishedProduce) {
-				queue.add(buffer);
-
-				maybeNotifyListener();
-			}
-			else if (!buffer.isRecycled()) {
-					buffer.recycle();
-			}
-		}
-	}
-
-	@Override
-	public boolean isFinished() {
-		synchronized (queue) {
-			return hasFinishedProduce;
-		}
-	}
-
-	@Override
-	public void finish() {
-		synchronized (queue) {
-			if (hasFinishedProduce) {
-				return;
-			}
-
-			queue.add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
-
-			hasFinishedProduce = true;
-
-			maybeNotifyListener();
-		}
-	}
-
-	@Override
-	public int recycleBuffers() {
-		// Nothing to do here. Buffers are recycled when they are consumed.
-		return 0;
-	}
-
-	@Override
-	public void discard() {
-		synchronized (queue) {
-			Buffer buffer;
-			while ((buffer = queue.poll()) != null) {
-				if (!buffer.isRecycled()) {
-					buffer.recycle();
-				}
-			}
-
-			hasFinishedProduce = true;
-			hasBeenDiscarded = true;
-
-			maybeNotifyListener();
-		}
-	}
-
-	@Override
-	public IntermediateResultPartitionQueueIterator getQueueIterator(Optional<BufferProvider> bufferProvider)
-			throws IllegalQueueIteratorRequestException {
-		synchronized (queue) {
-			if (hasBeenDiscarded) {
-				throw new IllegalQueueIteratorRequestException("Queue has been discarded during produce phase.");
-			}
-
-			if (hasConsumer) {
-				throw new IllegalQueueIteratorRequestException("Consumable once queue has been consumed/is being consumed.");
-			}
-
-			hasConsumer = true;
-
-			return new PipelinedPartitionQueueIterator(this);
-		}
-	}
-
-	// Call in synchronized scope
-	private void maybeNotifyListener() {
-		NotificationListener consumer = listener;
-		if (consumer != null) {
-			listener = null;
-
-			// TODO This is dangerous with the locks. Every listener needs to make sure not to query the queue again :S
-			consumer.onNotification();
-		}
-	}
-
-	private static class PipelinedPartitionQueueIterator implements IntermediateResultPartitionQueueIterator {
-
-		private final PipelinedPartitionQueue partitionQueue;
-
-		private boolean isDiscarded;
-
-		private PipelinedPartitionQueueIterator(PipelinedPartitionQueue partitionQueue) {
-			this.partitionQueue = partitionQueue;
-		}
-
-		@Override
-		public boolean isConsumed() {
-			synchronized (partitionQueue.queue) {
-				return (partitionQueue.isFinished() && partitionQueue.queue.isEmpty()) || partitionQueue.hasBeenDiscarded;
-			}
-		}
-
-		@Override
-		public Buffer getNextBuffer() {
-			synchronized (partitionQueue.queue) {
-				return partitionQueue.queue.poll();
-			}
-		}
-
-		@Override
-		public void discard() {
-			synchronized (partitionQueue.queue) {
-				if (!isDiscarded) {
-					partitionQueue.discard();
-
-					isDiscarded = true;
-				}
-			}
-		}
-
-		@Override
-		public boolean subscribe(NotificationListener listener) throws AlreadySubscribedException {
-			synchronized (partitionQueue.queue) {
-				if (isConsumed() || !partitionQueue.queue.isEmpty()) {
-					return false;
-				}
-
-				if (partitionQueue.listener == null) {
-					partitionQueue.listener = listener;
-					return true;
-				}
-
-				throw new AlreadySubscribedException();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
deleted file mode 100644
index 5de42b1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ /dev/null
@@ -1,637 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.util.StringUtils;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.Random;
-
-/**
- * @param <T> The type of the record to be deserialized.
- */
-public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
-	
-	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
-	
-	private final NonSpanningWrapper nonSpanningWrapper;
-	
-	private final SpanningWrapper spanningWrapper;
-
-	private Buffer currentBuffer;
-
-	public SpillingAdaptiveSpanningRecordDeserializer() {
-		
-		String tempDirString = GlobalConfiguration.getString(
-				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
-		String[] directories = tempDirString.split(",|" + File.pathSeparator);
-		
-		this.nonSpanningWrapper = new NonSpanningWrapper();
-		this.spanningWrapper = new SpanningWrapper(directories);
-	}
-
-	@Override
-	public void setNextBuffer(Buffer buffer) throws IOException {
-		currentBuffer = buffer;
-
-		MemorySegment segment = buffer.getMemorySegment();
-		int numBytes = buffer.getSize();
-
-		setNextMemorySegment(segment, numBytes);
-	}
-
-	@Override
-	public Buffer getCurrentBuffer () {
-		Buffer tmp = currentBuffer;
-		currentBuffer = null;
-		return tmp;
-	}
-	
-	@Override
-	public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException {
-		// check if some spanning record deserialization is pending
-		if (this.spanningWrapper.getNumGatheredBytes() > 0) {
-			this.spanningWrapper.addNextChunkFromMemorySegment(segment, numBytes);
-		}
-		else {
-			this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes);
-		}
-	}
-	
-	@Override
-	public DeserializationResult getNextRecord(T target) throws IOException {
-		// always check the non-spanning wrapper first.
-		// this should be the majority of the cases for small records
-		// for large records, this portion of the work is very small in comparison anyways
-		
-		int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-		
-		// check if we can get a full length;
-		if (nonSpanningRemaining >= 4) {
-			int len = this.nonSpanningWrapper.readInt();
-
-			if (len <= nonSpanningRemaining - 4) {
-				// we can get a full record from here
-				target.read(this.nonSpanningWrapper);
-				
-				return (this.nonSpanningWrapper.remaining() == 0) ?
-					DeserializationResult.LAST_RECORD_FROM_BUFFER :
-					DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-			} else {
-				// we got the length, but we need the rest from the spanning deserializer
-				// and need to wait for more buffers
-				this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
-				this.nonSpanningWrapper.clear();
-				return DeserializationResult.PARTIAL_RECORD;
-			}
-		} else if (nonSpanningRemaining > 0) {
-			// we have an incomplete length
-			// add our part of the length to the length buffer
-			this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
-			this.nonSpanningWrapper.clear();
-			return DeserializationResult.PARTIAL_RECORD;
-		}
-		
-		// spanning record case
-		if (this.spanningWrapper.hasFullRecord()) {
-			// get the full record
-			target.read(this.spanningWrapper.getInputView());
-			
-			// move the remainder to the non-spanning wrapper
-			// this does not copy it, only sets the memory segment
-			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
-			this.spanningWrapper.clear();
-			
-			return (this.nonSpanningWrapper.remaining() == 0) ?
-				DeserializationResult.LAST_RECORD_FROM_BUFFER :
-				DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-		} else {
-			return DeserializationResult.PARTIAL_RECORD;
-		}
-	}
-
-	@Override
-	public void clear() {
-		this.nonSpanningWrapper.clear();
-		this.spanningWrapper.clear();
-	}
-
-	@Override
-	public boolean hasUnfinishedData() {
-		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	
-	private static final class NonSpanningWrapper implements DataInputView {
-		
-		private MemorySegment segment;
-		
-		private int limit;
-		
-		private int position;
-		
-		private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
-		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-		
-		int remaining() {
-			return this.limit - this.position;
-		}
-		
-		void clear() {
-			this.segment = null;
-			this.limit = 0;
-			this.position = 0;
-		}
-		
-		void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
-			this.segment = seg;
-			this.position = position;
-			this.limit = leftOverLimit;
-		}
-		
-		// -------------------------------------------------------------------------------------------------------------
-		//                                       DataInput specific methods
-		// -------------------------------------------------------------------------------------------------------------
-		
-		@Override
-		public final void readFully(byte[] b) throws IOException {
-			readFully(b, 0, b.length);
-		}
-
-		@Override
-		public final void readFully(byte[] b, int off, int len) throws IOException {
-			if (off < 0 || len < 0 || off + len > b.length) {
-				throw new IndexOutOfBoundsException();
-			}
-			
-			this.segment.get(this.position, b, off, len);
-			this.position += len;
-		}
-
-		@Override
-		public final boolean readBoolean() throws IOException {
-			return readByte() == 1;
-		}
-
-		@Override
-		public final byte readByte() throws IOException {
-			return this.segment.get(this.position++);
-		}
-
-		@Override
-		public final int readUnsignedByte() throws IOException {
-			return readByte() & 0xff;
-		}
-
-		@Override
-		public final short readShort() throws IOException {
-			final short v = this.segment.getShort(this.position);
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final int readUnsignedShort() throws IOException {
-			final int v = this.segment.getShort(this.position) & 0xffff;
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final char readChar() throws IOException  {
-			final char v = this.segment.getChar(this.position);
-			this.position += 2;
-			return v;
-		}
-
-		@Override
-		public final int readInt() throws IOException {
-			final int v = this.segment.getIntBigEndian(this.position);
-			this.position += 4;
-			return v;
-		}
-
-		@Override
-		public final long readLong() throws IOException {
-			final long v = this.segment.getLongBigEndian(this.position);
-			this.position += 8;
-			return v;
-		}
-
-		@Override
-		public final float readFloat() throws IOException {
-			return Float.intBitsToFloat(readInt());
-		}
-
-		@Override
-		public final double readDouble() throws IOException {
-			return Double.longBitsToDouble(readLong());
-		}
-
-		@Override
-		public final String readLine() throws IOException {
-			final StringBuilder bld = new StringBuilder(32);
-			
-			try {
-				int b;
-				while ((b = readUnsignedByte()) != '\n') {
-					if (b != '\r') {
-						bld.append((char) b);
-					}
-				}
-			}
-			catch (EOFException eofex) {}
-
-			if (bld.length() == 0) {
-				return null;
-			}
-			
-			// trim a trailing carriage return
-			int len = bld.length();
-			if (len > 0 && bld.charAt(len - 1) == '\r') {
-				bld.setLength(len - 1);
-			}
-			return bld.toString();
-		}
-
-		@Override
-		public final String readUTF() throws IOException {
-			final int utflen = readUnsignedShort();
-			
-			final byte[] bytearr;
-			final char[] chararr;
-			
-			if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
-				bytearr = new byte[utflen];
-				this.utfByteBuffer = bytearr;
-			} else {
-				bytearr = this.utfByteBuffer;
-			}
-			if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
-				chararr = new char[utflen];
-				this.utfCharBuffer = chararr;
-			} else {
-				chararr = this.utfCharBuffer;
-			}
-
-			int c, char2, char3;
-			int count = 0;
-			int chararr_count = 0;
-
-			readFully(bytearr, 0, utflen);
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				if (c > 127) {
-					break;
-				}
-				count++;
-				chararr[chararr_count++] = (char) c;
-			}
-
-			while (count < utflen) {
-				c = (int) bytearr[count] & 0xff;
-				switch (c >> 4) {
-				case 0:
-				case 1:
-				case 2:
-				case 3:
-				case 4:
-				case 5:
-				case 6:
-				case 7:
-					count++;
-					chararr[chararr_count++] = (char) c;
-					break;
-				case 12:
-				case 13:
-					count += 2;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 1];
-					if ((char2 & 0xC0) != 0x80) {
-						throw new UTFDataFormatException("malformed input around byte " + count);
-					}
-					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-					break;
-				case 14:
-					count += 3;
-					if (count > utflen) {
-						throw new UTFDataFormatException("malformed input: partial character at end");
-					}
-					char2 = (int) bytearr[count - 2];
-					char3 = (int) bytearr[count - 1];
-					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-					}
-					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-					break;
-				default:
-					throw new UTFDataFormatException("malformed input around byte " + count);
-				}
-			}
-			// The number of chars produced may be less than utflen
-			return new String(chararr, 0, chararr_count);
-		}
-		
-		@Override
-		public final int skipBytes(int n) throws IOException {
-			if (n < 0) {
-				throw new IllegalArgumentException();
-			}
-			
-			int toSkip = Math.min(n, remaining());
-			this.position += toSkip;
-			return toSkip;
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			int skippedBytes = skipBytes(numBytes);
-
-			if(skippedBytes < numBytes){
-				throw new EOFException("Could not skip " + numBytes + " bytes.");
-			}
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			if(b == null){
-				throw new NullPointerException("Byte array b cannot be null.");
-			}
-
-			if(off < 0){
-				throw new IllegalArgumentException("The offset off cannot be negative.");
-			}
-
-			if(len < 0){
-				throw new IllegalArgumentException("The length len cannot be negative.");
-			}
-
-			int toRead = Math.min(len, remaining());
-			this.segment.get(this.position,b,off, toRead);
-			this.position += toRead;
-
-			return toRead;
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return read(b, 0, b.length);
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	
-	private static final class SpanningWrapper {
-		
-		private final byte[] initialBuffer = new byte[1024];
-		
-		private final String[] tempDirs;
-		
-		private final Random rnd = new Random();
-
-		private final DataInputDeserializer serializationReadBuffer;
-
-		private final ByteBuffer lengthBuffer;
-		
-		private FileChannel spillingChannel;
-		
-		private byte[] buffer;
-
-		private int recordLength;
-		
-		private int accumulatedRecordBytes;
-
-		private MemorySegment leftOverData;
-
-		private int leftOverStart;
-
-		private int leftOverLimit;
-		
-		private File spillFile;
-		
-		private InputViewDataInputStreamWrapper spillFileReader;
-		
-		public SpanningWrapper(String[] tempDirs) {
-			this.tempDirs = tempDirs;
-			
-			this.lengthBuffer = ByteBuffer.allocate(4);
-			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
-			this.recordLength = -1;
-
-			this.serializationReadBuffer = new DataInputDeserializer();
-			this.buffer = initialBuffer;
-		}
-		
-		private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
-			// set the length and copy what is available to the buffer
-			this.recordLength = nextRecordLength;
-			
-			final int numBytesChunk = partial.remaining();
-			
-			if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-				// create a spilling channel and put the data there
-				this.spillingChannel = createSpillingChannel();
-				
-				ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
-				this.spillingChannel.write(toWrite);
-			}
-			else {
-				// collect in memory
-				ensureBufferCapacity(numBytesChunk);
-				partial.segment.get(partial.position, buffer, 0, numBytesChunk);
-			}
-			
-			this.accumulatedRecordBytes = numBytesChunk;
-		}
-		
-		private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
-			// copy what we have to the length buffer
-			partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
-		}
-		
-		private void addNextChunkFromMemorySegment(MemorySegment segment, int numBytesInSegment) throws IOException {
-			int segmentPosition = 0;
-			
-			// check where to go. if we have a partial length, we need to complete it first
-			if (this.lengthBuffer.position() > 0) {
-				int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment);
-				segment.get(0, this.lengthBuffer, toPut);
-				
-				// did we complete the length?
-				if (this.lengthBuffer.hasRemaining()) {
-					return;
-				} else {
-					this.recordLength = this.lengthBuffer.getInt(0);
-					this.lengthBuffer.clear();
-					segmentPosition = toPut;
-					
-					if (this.recordLength > THRESHOLD_FOR_SPILLING) {
-						this.spillingChannel = createSpillingChannel();
-					}
-				}
-			}
-
-			// copy as much as we need or can for this next spanning record
-			int needed = this.recordLength - this.accumulatedRecordBytes;
-			int available = numBytesInSegment - segmentPosition;
-			int toCopy = Math.min(needed, available);
-
-			if (spillingChannel != null) {
-				// spill to file
-				ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
-				this.spillingChannel.write(toWrite);
-			}
-			else {
-				ensureBufferCapacity(accumulatedRecordBytes + toCopy);
-				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
-			}
-			
-			this.accumulatedRecordBytes += toCopy;
-			
-			if (toCopy < available) {
-				// there is more data in the segment
-				this.leftOverData = segment;
-				this.leftOverStart = segmentPosition + toCopy;
-				this.leftOverLimit = numBytesInSegment;
-			}
-			
-			if (accumulatedRecordBytes == recordLength) {
-				// we have the full record
-				if (spillingChannel == null) {
-					this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
-				}
-				else {
-					spillingChannel.close();
-					
-					DataInputStream inStream = new DataInputStream(new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024));
-					this.spillFileReader = new InputViewDataInputStreamWrapper(inStream);
-				}
-			}
-		}
-		
-		private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
-			deserializer.clear();
-			
-			if (leftOverData != null) {
-				deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
-			}
-		}
-		
-		private boolean hasFullRecord() {
-			return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
-		}
-		
-		private int getNumGatheredBytes() {
-			return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
-		}
-
-		public void clear() {
-			this.buffer = initialBuffer;
-			this.serializationReadBuffer.releaseArrays();
-
-			this.recordLength = -1;
-			this.lengthBuffer.clear();
-			this.leftOverData = null;
-			this.accumulatedRecordBytes = 0;
-			
-			if (spillingChannel != null) {
-				try {
-					spillingChannel.close();
-				}
-				catch (Throwable t) {
-					// ignore
-				}
-				spillingChannel = null;
-			}
-			if (spillFileReader != null) {
-				try {
-					spillFileReader.close();
-				}
-				catch (Throwable t) {
-					// ignore
-				}
-				spillFileReader = null;
-			}
-			if (spillFile != null) {
-				spillFile.delete();
-				spillFile = null;
-			}
-		}
-		
-		public DataInputView getInputView() {
-			if (spillFileReader == null) {
-				return serializationReadBuffer; 
-			}
-			else {
-				return spillFileReader;
-			}
-		}
-		
-		private void ensureBufferCapacity(int minLength) {
-			if (buffer.length < minLength) {
-				byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
-				System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes);
-				buffer = newBuffer;
-			}
-		}
-		
-		@SuppressWarnings("resource")
-		private FileChannel createSpillingChannel() throws IOException {
-			if (spillFile != null) {
-				throw new IllegalStateException("Spilling file already exists.");
-			}
-			
-			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
-			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
-			
-			return new RandomAccessFile(spillFile, "rw").getChannel();
-		}
-		
-		private static String randomString(Random random) {
-			final byte[] bytes = new byte[20];
-			random.nextBytes(bytes);
-			return StringUtils.byteToHexString(bytes);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index 5896dcf..ca1ade9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -46,7 +46,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 	private ArrayDeque<MemorySegment> fullBuffers;
 
-	private BlockChannelWriter currentWriter;
+	private BlockChannelWriter<MemorySegment> currentWriter;
 
 	private final IOManager ioManager;
 
@@ -182,7 +182,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 			// now close the writer and create the reader
 			currentWriter.close();
-			final BlockChannelReader reader = ioManager.createBlockChannelReader(currentWriter.getChannelID());
+			final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(currentWriter.getChannelID());
 
 			// gather some memory segments to circulate while reading back the data
 			final List<MemorySegment> readSegments = new ArrayList<MemorySegment>();
@@ -263,14 +263,14 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 		private final Deque<MemorySegment> fullBufferSource;
 
-		private final BlockChannelReader spilledBufferSource;
+		private final BlockChannelReader<MemorySegment> spilledBufferSource;
 
 		private int spilledBuffersRemaining;
 
 		private int requestsRemaining;
 
 		private ReadEnd(MemorySegment firstMemSegment, LinkedBlockingQueue<MemorySegment> emptyBufferTarget,
-										Deque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
+										Deque<MemorySegment> fullBufferSource, BlockChannelReader<MemorySegment> spilledBufferSource,
 										List<MemorySegment> emptyBuffers, int numBuffersSpilled)
 			throws IOException {
 			super(firstMemSegment, firstMemSegment.getInt(0), HEADER_LENGTH);

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index b8e4857..abaf311 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,7 +91,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 	private TypeSerializerFactory<X> solutionTypeSerializer;
 
-	private BufferWriter toSync;
+	private ResultPartitionWriter toSync;
 
 	private int initialSolutionSetInput; // undefined for bulk iterations
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index b9e1eac..609ed3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -318,31 +319,31 @@ public class AbstractJobVertex implements java.io.Serializable {
 	public IntermediateDataSet createAndAddResultDataSet() {
 		return createAndAddResultDataSet(new IntermediateDataSetID());
 	}
-	
+
 	public IntermediateDataSet createAndAddResultDataSet(IntermediateDataSetID id) {
-		IntermediateDataSet result = new IntermediateDataSet(id, this);
+		IntermediateDataSet result = new IntermediateDataSet(id, ResultPartitionType.PIPELINED, this);
 		this.results.add(result);
 		return result;
 	}
-	
+
 	public void connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) {
 		JobEdge edge = new JobEdge(dataSet, this, distPattern);
 		this.inputs.add(edge);
 		dataSet.addConsumer(edge);
 	}
-	
+
 	public void connectNewDataSetAsInput(AbstractJobVertex input, DistributionPattern distPattern) {
 		IntermediateDataSet dataSet = input.createAndAddResultDataSet();
 		JobEdge edge = new JobEdge(dataSet, this, distPattern);
 		this.inputs.add(edge);
 		dataSet.addConsumer(edge);
 	}
-	
+
 	public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
 		JobEdge edge = new JobEdge(dataSetId, this, distPattern);
 		this.inputs.add(edge);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	public boolean isInputVertex() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index c8415de..86888e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
 import java.util.ArrayList;
 import java.util.List;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * An intermediate data set is the data set produced by an operator - either a
  * source or any intermediate operation.
@@ -38,6 +42,9 @@ public class IntermediateDataSet implements java.io.Serializable {
 	private final AbstractJobVertex producer;		// the operation that produced this data set
 	
 	private final List<JobEdge> consumers = new ArrayList<JobEdge>();
+
+	// The type of partition to use at runtime
+	private final ResultPartitionType resultType;
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -46,12 +53,13 @@ public class IntermediateDataSet implements java.io.Serializable {
 	}
 	
 	public IntermediateDataSet(IntermediateDataSetID id, AbstractJobVertex producer) {
-		if (id == null || producer == null) {
-			throw new NullPointerException();
-		}
-		
-		this.id = id;
-		this.producer = producer;
+		this(id, ResultPartitionType.PIPELINED, producer);
+	}
+
+	public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, AbstractJobVertex producer) {
+		this.id = checkNotNull(id);
+		this.producer = checkNotNull(producer);
+		this.resultType = checkNotNull(resultType);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -67,6 +75,10 @@ public class IntermediateDataSet implements java.io.Serializable {
 	public List<JobEdge> getConsumers() {
 		return this.consumers;
 	}
+
+	public ResultPartitionType getResultType() {
+		return resultType;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionType.java
deleted file mode 100644
index 25d057e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionType.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-public enum IntermediateResultPartitionType {
-
-	BLOCKING(true, false, false),
-
-	PIPELINED(false, true, true),
-	PIPELINED_PERSISTENT(true, true, true);
-
-	private final boolean isPersistent;
-
-	private final boolean isPipelined;
-
-	private final boolean hasBackPressure;
-
-	IntermediateResultPartitionType(boolean isPersistent, boolean isPipelined, boolean hasBackPressure) {
-		this.isPersistent = isPersistent;
-		this.isPipelined = isPipelined;
-		this.hasBackPressure = hasBackPressure;
-	}
-
-	public boolean hasBackPressure() {
-		return hasBackPressure;
-	}
-
-	public boolean isPipelined() {
-		return isPipelined;
-	}
-
-	public boolean isPersistent() {
-		return isPersistent;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
index 013a733..63f2fd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TempBarrier.java
@@ -166,6 +166,7 @@ public class TempBarrier<T> implements CloseableInputProvider<T> {
 			this.buffer = buffer;
 		}
 		
+		@Override
 		public void run() {
 			final MutableObjectIterator<T> input = this.input;
 			final TypeSerializer<T> serializer = this.serializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 14e4ae6..cee9ebb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -93,9 +93,9 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	
 	// ------------------------------------------ Spilling ----------------------------------------------
 	
-	private BlockChannelWriter buildSideChannel;		// the channel writer for the build side, if partition is spilled
+	private BlockChannelWriter<MemorySegment> buildSideChannel;		// the channel writer for the build side, if partition is spilled
 	
-	protected BlockChannelWriter probeSideChannel;		// the channel writer from the probe side, if partition is spilled
+	protected BlockChannelWriter<MemorySegment> probeSideChannel;		// the channel writer from the probe side, if partition is spilled
 	
 	// ------------------------------------------ Restoring ----------------------------------------------
 	
@@ -219,12 +219,12 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		return this.probeSideRecordCounter;
 	}
 
-	public BlockChannelWriter getBuildSideChannel() {
+	public BlockChannelWriter<MemorySegment> getBuildSideChannel() {
 		return this.buildSideChannel;
 	}
 	
 	
-	public BlockChannelWriter getProbeSideChannel() {
+	public BlockChannelWriter<MemorySegment> getProbeSideChannel() {
 		return this.probeSideChannel;
 	}
 	
@@ -506,7 +506,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		
 		private final MemorySegmentSource memSource;
 		
-		private BlockChannelWriter writer;
+		private BlockChannelWriter<MemorySegment> writer;
 		
 		private int currentBlockNumber;
 		
@@ -553,7 +553,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			return this.currentBlockNumber + 1;
 		}
 		
-		int spill(BlockChannelWriter writer) throws IOException
+		int spill(BlockChannelWriter<MemorySegment> writer) throws IOException
 		{
 			this.writer = writer;
 			final int numSegments = this.targetList.size();


Mime
View raw message