flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [12/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant
Date Wed, 18 Mar 2015 16:49:01 GMT
[FLINK-1350] [runtime] Add blocking result partition variant

- Renames runtime intermediate result classes:
  a) Removes "Intermediate" prefix
  b) Queue => Subpartition
  c) Iterator => View

- [FLINK-1350] Adds a spillable result subpartition variant for BLOCKING
  results, which writes data to memory first and starts to spill
  (asynchronously) if not enough memory is available to produce the
  result in-memory only.

  Receiving tasks of BLOCKING results are only deployed after *all*
  partitions have been fully produced. PIPELINED and BLOCKING results can not
  be mixed.

- [FLINK-1359] Adds simple state tracking to result partitions with
  notifications after partitions/subpartitions have been consumed. Each
  partition has to be consumed at least once before it can be released.

  Currently there is no notion of historic intermediate results, i.e. results
  are released as soon as they are consumed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d7acf36
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d7acf36
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d7acf36

Branch: refs/heads/master
Commit: 9d7acf3657cbd3fb0b238b20ba864b6a74774e40
Parents: 1930678
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Jan 6 17:11:08 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Mar 18 17:44:40 2015 +0100

----------------------------------------------------------------------
 .../flink/compiler/dag/OptimizerNode.java       |   7 +-
 .../org/apache/flink/compiler/plan/Channel.java |   1 +
 .../apache/flink/compiler/plan/PlanNode.java    |   3 +-
 .../plantranslate/NepheleJobGraphGenerator.java |   5 +-
 .../flink/compiler/PipelineBreakerTest.java     | 176 ++---
 .../flink/configuration/ConfigConstants.java    |  15 +-
 .../java/org/apache/flink/util/AbstractID.java  |   6 +
 .../org/apache/flink/api/java/tuple/Tuple.java  |   2 +
 flink-runtime/pom.xml                           |   2 +-
 .../InputChannelDeploymentDescriptor.java       | 138 ++++
 .../InputGateDeploymentDescriptor.java          |  89 +++
 ...PartialInputChannelDeploymentDescriptor.java | 116 ++++
 .../deployment/PartialPartitionInfo.java        | 102 ---
 .../PartitionConsumerDeploymentDescriptor.java  | 102 ---
 .../PartitionDeploymentDescriptor.java          | 122 ----
 .../flink/runtime/deployment/PartitionInfo.java | 176 -----
 .../ResultPartitionDeploymentDescriptor.java    | 114 ++++
 .../deployment/ResultPartitionLocation.java     | 101 +++
 .../deployment/TaskDeploymentDescriptor.java    | 103 ++-
 .../flink/runtime/execution/Environment.java    |   6 +-
 .../runtime/execution/RuntimeEnvironment.java   |  40 +-
 .../flink/runtime/executiongraph/Execution.java | 215 ++++---
 .../runtime/executiongraph/ExecutionEdge.java   |   5 +
 .../runtime/executiongraph/ExecutionGraph.java  |  27 +-
 .../executiongraph/ExecutionJobVertex.java      |  32 +-
 .../runtime/executiongraph/ExecutionVertex.java | 214 ++++---
 .../executiongraph/IntermediateResult.java      |  52 +-
 .../IntermediateResultPartition.java            |  29 +
 .../io/disk/ChannelReaderInputViewIterator.java |   2 +-
 .../runtime/io/disk/FileChannelInputView.java   |   6 +-
 .../runtime/io/disk/FileChannelOutputView.java  |   6 +-
 .../io/disk/SeekableFileChannelInputView.java   |   6 +-
 .../flink/runtime/io/disk/SpillingBuffer.java   |  12 +-
 .../disk/iomanager/AbstractFileIOChannel.java   |   7 +-
 .../disk/iomanager/AsynchronousBlockReader.java |  22 +-
 .../disk/iomanager/AsynchronousBlockWriter.java |   6 +-
 .../AsynchronousBlockWriterWithCallback.java    |   2 +-
 .../iomanager/AsynchronousBufferFileReader.java |  48 ++
 .../AsynchronousBufferFileSegmentReader.java    |  46 ++
 .../iomanager/AsynchronousBufferFileWriter.java |  64 ++
 .../iomanager/AsynchronousFileIOChannel.java    | 271 ++++++--
 .../io/disk/iomanager/BlockChannelReader.java   |  22 +-
 .../io/disk/iomanager/BlockChannelWriter.java   |  10 +-
 .../BlockChannelWriterWithCallback.java         |  12 +-
 .../io/disk/iomanager/BufferFileReader.java     |  33 +
 .../disk/iomanager/BufferFileSegmentReader.java |  31 +
 .../io/disk/iomanager/BufferFileWriter.java     |  38 ++
 .../disk/iomanager/ChannelReaderInputView.java  |  10 +-
 .../disk/iomanager/ChannelWriterOutputView.java |   8 +-
 .../io/disk/iomanager/FileIOChannel.java        |   3 +
 .../runtime/io/disk/iomanager/FileSegment.java  |  52 ++
 .../HeaderlessChannelReaderInputView.java       |   4 +-
 .../runtime/io/disk/iomanager/IOManager.java    |  32 +-
 .../io/disk/iomanager/IOManagerAsync.java       |  32 +-
 .../io/disk/iomanager/QueuingCallback.java      |  14 +-
 .../iomanager/SynchronousBufferFileReader.java  |  85 +++
 .../iomanager/SynchronousFileIOChannel.java     |   2 +-
 .../flink/runtime/io/network/ConnectionID.java  |  85 +++
 .../runtime/io/network/ConnectionManager.java   |  13 +-
 .../io/network/LocalConnectionManager.java      |   9 +-
 .../runtime/io/network/NetworkEnvironment.java  |  53 +-
 .../flink/runtime/io/network/RemoteAddress.java | 122 ----
 .../runtime/io/network/TaskEventDispatcher.java |  57 +-
 .../api/reader/AbstractRecordReader.java        |   3 +-
 ...llingAdaptiveSpanningRecordDeserializer.java | 637 +++++++++++++++++++
 .../io/network/api/writer/BufferWriter.java     | 119 ----
 .../io/network/api/writer/RecordWriter.java     |  13 +-
 .../api/writer/ResultPartitionWriter.java       | 115 ++++
 .../flink/runtime/io/network/buffer/Buffer.java |  14 +-
 .../runtime/io/network/buffer/BufferPool.java   |   2 +-
 .../io/network/buffer/BufferPoolOwner.java      |   2 +-
 .../io/network/buffer/BufferProvider.java       |   2 +
 .../io/network/buffer/LocalBufferPool.java      |  25 +-
 .../io/network/buffer/NetworkBufferPool.java    |   8 +-
 .../runtime/io/network/netty/NettyConfig.java   |  31 +-
 .../network/netty/NettyConnectionManager.java   |  17 +-
 .../runtime/io/network/netty/NettyMessage.java  |  36 +-
 .../network/netty/PartitionRequestClient.java   |  27 +-
 .../netty/PartitionRequestClientFactory.java    |  42 +-
 .../network/netty/PartitionRequestProtocol.java |  11 +-
 .../io/network/netty/PartitionRequestQueue.java |  69 +-
 .../netty/PartitionRequestServerHandler.java    |  44 +-
 .../partition/IntermediateResultPartition.java  | 319 ----------
 .../IntermediateResultPartitionManager.java     | 139 ----
 .../IntermediateResultPartitionProvider.java    |  37 --
 .../partition/PipelinedSubpartition.java        | 218 +++++++
 .../partition/PipelinedSubpartitionView.java    |  74 +++
 .../io/network/partition/ResultPartition.java   | 421 ++++++++++++
 .../io/network/partition/ResultPartitionID.java |  77 +++
 .../partition/ResultPartitionManager.java       | 146 +++++
 .../partition/ResultPartitionProvider.java      |  33 +
 .../network/partition/ResultPartitionType.java  |  62 ++
 .../network/partition/ResultSubpartition.java   |  81 +++
 .../partition/ResultSubpartitionView.java       |  59 ++
 .../partition/SpillableSubpartition.java        | 222 +++++++
 .../partition/SpillableSubpartitionView.java    | 163 +++++
 .../SpilledSubpartitionViewAsyncIO.java         | 365 +++++++++++
 .../SpilledSubpartitionViewSyncIO.java          | 177 ++++++
 .../partition/consumer/InputChannel.java        |  38 +-
 .../partition/consumer/LocalInputChannel.java   |  75 ++-
 .../partition/consumer/RemoteInputChannel.java  |  29 +-
 .../partition/consumer/SingleInputGate.java     | 176 ++---
 .../partition/consumer/UnknownInputChannel.java |  33 +-
 .../IllegalQueueIteratorRequestException.java   |  33 -
 .../queue/IntermediateResultPartitionQueue.java |  53 --
 ...ntermediateResultPartitionQueueIterator.java |  74 ---
 .../queue/PipelinedPartitionQueue.java          | 185 ------
 ...llingAdaptiveSpanningRecordDeserializer.java | 637 -------------------
 .../iterative/io/SerializedUpdateBuffer.java    |   8 +-
 .../iterative/task/IterationHeadPactTask.java   |   4 +-
 .../runtime/jobgraph/AbstractJobVertex.java     |  13 +-
 .../runtime/jobgraph/IntermediateDataSet.java   |  24 +-
 .../IntermediateResultPartitionType.java        |  51 --
 .../flink/runtime/operators/TempBarrier.java    |   1 +
 .../runtime/operators/hash/HashPartition.java   |  12 +-
 .../operators/hash/MutableHashTable.java        |   4 +-
 .../operators/hash/ReOpenableHashPartition.java |   2 +-
 .../sort/CombiningUnilateralSortMerger.java     |   4 +-
 .../operators/sort/UnilateralSortMerger.java    |   6 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  37 +-
 .../util/AtomicDisposableReferenceCounter.java  |  55 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  20 +-
 .../runtime/messages/JobManagerMessages.scala   |   9 +-
 .../runtime/messages/TaskManagerMessages.scala  |  28 +-
 .../NetworkEnvironmentConfiguration.scala       |   2 +
 .../flink/runtime/taskmanager/TaskManager.scala |  39 +-
 .../TaskDeploymentDescriptorTest.java           |   6 +-
 .../ExecutionGraphDeploymentTest.java           |  14 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java |  24 +-
 .../io/disk/FileChannelStreamsITCase.java       |  20 +-
 .../runtime/io/disk/FileChannelStreamsTest.java |   4 +-
 .../disk/SeekableFileChannelInputViewTest.java  |   2 +-
 .../AsynchronousBufferFileWriterTest.java       | 178 ++++++
 .../AsynchronousFileIOChannelTest.java          | 429 +++++++++++++
 .../AsynchronousFileIOChannelsTest.java         | 176 -----
 .../BufferFileWriterFileSegmentReaderTest.java  | 198 ++++++
 .../iomanager/BufferFileWriterReaderTest.java   | 225 +++++++
 .../io/disk/iomanager/IOManagerAsyncTest.java   |  31 +-
 .../io/disk/iomanager/IOManagerITCase.java      |   7 +-
 .../IOManagerPerformanceBenchmark.java          |   4 +-
 .../io/disk/iomanager/IOManagerTest.java        |  23 +-
 .../network/api/reader/AbstractReaderTest.java  |   1 +
 .../io/network/api/reader/BufferReaderTest.java |   7 +-
 .../IteratorWrappingMockSingleInputGate.java    | 103 ---
 .../IteratorWrappingTestSingleInputGate.java    | 103 +++
 .../SpanningRecordSerializationTest.java        |   1 -
 .../network/buffer/BufferPoolFactoryTest.java   |   2 +-
 .../runtime/io/network/buffer/BufferTest.java   |  52 --
 .../io/network/buffer/LocalBufferPoolTest.java  |  84 ++-
 .../netty/NettyMessageSerializationTest.java    |   9 +-
 .../PartitionRequestClientFactoryTest.java      |  10 +-
 .../PartitionRequestClientHandlerTest.java      |   1 +
 .../partition/PipelinedSubpartitionTest.java    | 278 ++++++++
 .../partition/SpillableSubpartitionTest.java    |  49 ++
 .../SpilledSubpartitionViewAsyncIOTest.java     |  65 ++
 .../SpilledSubpartitionViewSyncIOTest.java      | 103 +++
 .../partition/SpilledSubpartitionViewTest.java  | 181 ++++++
 .../network/partition/SubpartitionTestBase.java |  70 ++
 .../partition/consumer/SingleInputGateTest.java |  41 +-
 .../partition/consumer/UnionInputGateTest.java  |   8 +-
 .../queue/PipelinedPartitionQueueTest.java      | 224 -------
 .../network/serialization/LargeRecordsTest.java |   1 +
 .../runtime/io/network/util/MockConsumer.java   | 100 ---
 .../io/network/util/MockInputChannel.java       | 130 ----
 .../network/util/MockNotificationListener.java  |  52 --
 .../runtime/io/network/util/MockProducer.java   | 106 ---
 .../io/network/util/MockSingleInputGate.java    | 137 ----
 .../io/network/util/TestBufferFactory.java      |  88 +++
 .../io/network/util/TestConsumerCallback.java   | 103 +++
 .../util/TestInfiniteBufferProvider.java        |  81 +++
 .../io/network/util/TestInputChannel.java       | 130 ++++
 .../network/util/TestNotificationListener.java  |  73 +++
 .../io/network/util/TestPartitionProducer.java  | 107 ++++
 .../network/util/TestPooledBufferProvider.java  | 152 +++++
 .../io/network/util/TestProducerSource.java     |  32 +
 .../io/network/util/TestSingleInputGate.java    | 140 ++++
 .../network/util/TestSubpartitionConsumer.java  | 121 ++++
 .../network/util/TestSubpartitionProducer.java  | 105 +++
 .../runtime/io/network/util/TestTaskEvent.java  |   3 +
 .../runtime/operators/DataSinkTaskTest.java     |  10 +-
 .../runtime/operators/DataSourceTaskTest.java   |   4 +-
 .../operators/chaining/ChainTaskTest.java       |   4 +-
 .../operators/testutils/MockEnvironment.java    |  20 +-
 .../operators/testutils/TaskTestBase.java       |  10 +-
 .../TaskManagerProcessReapingTest.java          |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  70 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  54 +-
 .../runtime/testutils/DiscardingRecycler.java   |   4 +-
 .../AtomicDisposableReferenceCounterTest.java   |  27 +-
 .../flink/streaming/io/StreamRecordWriter.java  |  10 +-
 .../io/StreamingAbstractRecordReader.java       |   2 +-
 .../flink/test/util/JavaProgramTestBase.java    |  68 +-
 .../exampleJavaPrograms/WordCountITCase.java    |   6 +-
 .../test/iterative/KMeansForTestITCase.java     | 276 ++++++++
 .../ProcessFailureBatchRecoveryITCase.java      |   2 +-
 ...ConsumePipelinedAndBlockingResultITCase.java |  67 ++
 .../flink/test/runtime/JoinDeadlockITCase.java  |  70 ++
 .../test/runtime/SelfJoinDeadlockITCase.java    | 104 +++
 .../flink/test/util/testjar/KMeansForTest.java  |  34 +-
 199 files changed, 9363 insertions(+), 4562 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
index 16c60a4..5aaf114 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
@@ -460,12 +460,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	public InterestingProperties getInterestingProperties() {
 		return this.intProps;
 	}
-	
-	
+
+	@Override
 	public long getEstimatedOutputSize() {
 		return this.estimatedOutputSize;
 	}
 
+	@Override
 	public long getEstimatedNumRecords() {
 		return this.estimatedNumRecords;
 	}
@@ -478,6 +479,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		this.estimatedNumRecords = estimatedNumRecords;
 	}
 	
+	@Override
 	public float getEstimatedAvgWidthPerOutputRecord() {
 		if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 0) {
 			return ((float) this.estimatedOutputSize) / this.estimatedNumRecords;
@@ -941,6 +943,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		if (this.closedBranchingNodes == null) { 
 			this.closedBranchingNodes = new HashSet<OptimizerNode>();
 		}
+
 		this.closedBranchingNodes.add(alreadyClosed);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
index 3903c84..454cf30 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
@@ -105,6 +105,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	 *
 	 * @return The source.
 	 */
+	@Override
 	public PlanNode getSource() {
 		return this.source;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-compiler/src/main/java/org/apache/flink/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/PlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/PlanNode.java
index 4f72144..0b74add 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/PlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/PlanNode.java
@@ -381,8 +381,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	public List<Channel> getOutgoingChannels() {
 		return this.outChannels;
 	}
-	
-	
+
 	// --------------------------------------------------------------------------------------------
 	//                                Miscellaneous
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 4681d0d..052d439 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1068,10 +1068,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			default:
 				throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
 		}
-		
+
 		targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern);
-		
-//		sourceVertex.conn/ectTo(targetVertex, channelType, distributionPattern);
 
 		// -------------- configure the source task's ship strategy strategies in task config --------------
 		final int outputIndex = sourceConfig.getNumOutputs();
@@ -1140,6 +1138,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			final TempMode tm = channel.getTempMode();
 
 			boolean needsMemory = false;
+			// Don't add a pipeline breaker if the data exchange is already blocking.
 			if (tm.breaksPipeline()) {
 				config.setInputAsynchronouslyMaterialized(inputNum, true);
 				needsMemory = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
index e034696..36eb85b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
@@ -18,21 +18,29 @@
 
 package org.apache.flink.compiler;
 
-import static org.junit.Assert.*;
-
-import org.apache.flink.compiler.testfunctions.IdentityMapper;
-import org.apache.flink.compiler.testfunctions.SelectOneReducer;
-import org.junit.Test;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.SourcePlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.apache.flink.compiler.testfunctions.SelectOneReducer;
 import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class PipelineBreakerTest extends CompilerTestBase {
@@ -42,21 +50,21 @@ public class PipelineBreakerTest extends CompilerTestBase {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.setDegreeOfParallelism(64);
-			
+
 			DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
-			
+
 			DataSet<Long> result = source.map(new IdentityMapper<Long>())
-										.map(new IdentityMapper<Long>())
-											.withBroadcastSet(source, "bc");
-			
+					.map(new IdentityMapper<Long>())
+					.withBroadcastSet(source, "bc");
+
 			result.print();
-			
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-			
+
 			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
 		}
 		catch (Exception e) {
@@ -64,33 +72,33 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testPipelineBreakerBroadcastedAllReduce() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.setDegreeOfParallelism(64);
-			
+
 			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
-			
+
 			DataSet<Long> bcInput1 = sourceWithMapper
-										.map(new IdentityMapper<Long>())
-										.reduce(new SelectOneReducer<Long>());
+					.map(new IdentityMapper<Long>())
+					.reduce(new SelectOneReducer<Long>());
 			DataSet<Long> bcInput2 = env.generateSequence(1, 10);
-			
+
 			DataSet<Long> result = sourceWithMapper
 					.map(new IdentityMapper<Long>())
-							.withBroadcastSet(bcInput1, "bc1")
-							.withBroadcastSet(bcInput2, "bc2");
-			
+					.withBroadcastSet(bcInput1, "bc1")
+					.withBroadcastSet(bcInput2, "bc2");
+
 			result.print();
-			
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-			
+
 			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
 		}
 		catch (Exception e) {
@@ -98,39 +106,39 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testPipelineBreakerBroadcastedPartialSolution() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.setDegreeOfParallelism(64);
-			
-			
+
+
 			DataSet<Long> initialSource = env.generateSequence(1, 10);
 			IterativeDataSet<Long> iteration = initialSource.iterate(100);
-			
-			
+
+
 			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
-			
+
 			DataSet<Long> bcInput1 = sourceWithMapper
-										.map(new IdentityMapper<Long>())
-										.reduce(new SelectOneReducer<Long>());
-			
+					.map(new IdentityMapper<Long>())
+					.reduce(new SelectOneReducer<Long>());
+
 			DataSet<Long> result = sourceWithMapper
 					.map(new IdentityMapper<Long>())
-							.withBroadcastSet(iteration, "bc2")
-							.withBroadcastSet(bcInput1, "bc1");
-							
-			
+					.withBroadcastSet(iteration, "bc2")
+					.withBroadcastSet(bcInput1, "bc1");
+
+
 			iteration.closeWith(result).print();
-			
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
 			SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();
-			
+
 			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
 		}
 		catch (Exception e) {
@@ -138,98 +146,98 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testPilelineBreakerWithCross() {
 		try {
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.setDegreeOfParallelism(64);
-				
+
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
+
+				Configuration conf = new Configuration();
 				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
 				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
+						.map(new IdentityMapper<Long>())
+						.cross(initialSource).withParameters(conf)
+						.print();
+
+
 				Plan p = env.createProgramPlan();
 				OptimizedPlan op = compileNoStats(p);
 				SinkPlanNode sink = op.getDataSinks().iterator().next();
 				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
+
 				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
 			}
-			
+
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.setDegreeOfParallelism(64);
-				
+
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
+
+				Configuration conf = new Configuration();
 				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
 				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
+						.map(new IdentityMapper<Long>())
+						.cross(initialSource).withParameters(conf)
+						.print();
+
+
 				Plan p = env.createProgramPlan();
 				OptimizedPlan op = compileNoStats(p);
-				
+
 				SinkPlanNode sink = op.getDataSinks().iterator().next();
 				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
+
 				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
 			}
-			
+
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.setDegreeOfParallelism(64);
-				
+
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
+
+				Configuration conf = new Configuration();
 				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
 				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
+						.map(new IdentityMapper<Long>())
+						.cross(initialSource).withParameters(conf)
+						.print();
+
+
 				Plan p = env.createProgramPlan();
 				OptimizedPlan op = compileNoStats(p);
-				
+
 				SinkPlanNode sink = op.getDataSinks().iterator().next();
 				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
+
 				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
 			}
-			
+
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				env.setDegreeOfParallelism(64);
-				
+
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
+
+				Configuration conf = new Configuration();
 				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
 				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
+						.map(new IdentityMapper<Long>())
+						.cross(initialSource).withParameters(conf)
+						.print();
+
+
 				Plan p = env.createProgramPlan();
 				OptimizedPlan op = compileNoStats(p);
-				
+
 				SinkPlanNode sink = op.getDataSinks().iterator().next();
 				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
+
 				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index dd920d7..4fc89c3 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -122,8 +122,7 @@ public final class ConfigConstants {
 	 * The key for the config parameter defining whether the memory manager allocates memory lazy.
 	 */
 	public static final String TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY = "taskmanager.memory.lazyalloc";
-	
-	
+
 	/**
 	 * The config parameter defining the number of buffers used in the network stack. This defines the
 	 * number of possible tasks and shuffles.
@@ -136,6 +135,12 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
 
 	/**
+	 * The implementation to use for spillable/spilled intermediate results, which have both
+	 * synchronous and asynchronous implementations: "sync" or "async".
+	 */
+	public static final String TASK_MANAGER_NETWORK_DEFAULT_IO_MODE = "taskmanager.network.defaultIOMode";
+
+	/**
 	 * The config parameter defining the number of task slots of a task manager.
 	 */
 	public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots";
@@ -463,6 +468,12 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
 
 	/**
+	 * The implementation to use for spillable/spilled intermediate results, which have both
+	 * synchronous and asynchronous implementations: "sync" or "async".
+	 */
+	public static final String DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE = "sync";
+
+	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 */
 	public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
index a93c7ed..250869f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
@@ -174,6 +174,12 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 		longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
 		return StringUtils.byteToHexString(ba);
 	}
+
+	public String toShortString() {
+		final byte[] ba = new byte[SIZE_OF_LONG];
+		longToByteArray(upperPart, ba, 0);
+		return StringUtils.byteToHexString(ba);
+	}
 	
 	@Override
 	public int compareTo(AbstractID o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 145d215..9da50c7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -108,5 +108,7 @@ public abstract class Tuple implements java.io.Serializable {
 	private static final Class<?>[] CLASSES = new Class<?>[] {
 		Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class
 	};
+
+
 	// END_OF_TUPLE_DEPENDENT_CODE
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 14e27aa..ad570b8 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -90,7 +90,7 @@ under the License.
 		<dependency>
 			<groupId>io.netty</groupId>
 			<artifactId>netty-all</artifactId>
-			<version>4.0.24.Final</version>
+			<version>4.0.26.Final</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
new file mode 100644
index 0000000..7592231
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Deployment descriptor for a single input channel instance.
+ *
+ * <p> Each input channel consumes a single subpartition. The index of the subpartition to consume
+ * is part of the {@link InputGateDeploymentDescriptor} as it is the same for each input channel of
+ * the respective input gate.
+ *
+ * @see InputChannel
+ * @see SingleInputGate
+ */
+public class InputChannelDeploymentDescriptor implements Serializable {
+
+	private static Logger LOG = LoggerFactory.getLogger(InputChannelDeploymentDescriptor.class);
+
+	/** The ID of the partition the input channel is going to consume. */
+	private final ResultPartitionID consumedPartitionId;
+
+	/** The location of the partition the input channel is going to consume. */
+	private final ResultPartitionLocation consumedPartitionLocation;
+
+	public InputChannelDeploymentDescriptor(
+			ResultPartitionID consumedPartitionId,
+			ResultPartitionLocation consumedPartitionLocation) {
+
+		this.consumedPartitionId = checkNotNull(consumedPartitionId);
+		this.consumedPartitionLocation = checkNotNull(consumedPartitionLocation);
+	}
+
+	public ResultPartitionID getConsumedPartitionId() {
+		return consumedPartitionId;
+	}
+
+	public ResultPartitionLocation getConsumedPartitionLocation() {
+		return consumedPartitionLocation;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("InputChannelDeploymentDescriptor [consumed partition id: %s, " +
+						"consumed partition location: %s]",
+				consumedPartitionId, consumedPartitionLocation);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates an input channel deployment descriptor for each partition.
+	 */
+	public static InputChannelDeploymentDescriptor[] fromEdges(
+			ExecutionEdge[] edges, SimpleSlot consumerSlot) {
+
+		final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
+
+		// Each edge is connected to a different result partition
+		for (int i = 0; i < edges.length; i++) {
+			final IntermediateResultPartition consumedPartition = edges[i].getSource();
+			final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
+
+			final ExecutionState producerState = producer.getState();
+			final SimpleSlot producerSlot = producer.getAssignedResource();
+
+			final ResultPartitionLocation partitionLocation;
+
+			// The producing task needs to be RUNNING or already FINISHED
+			if (consumedPartition.isConsumable() && producerSlot != null &&
+					(producerState == ExecutionState.RUNNING
+							|| producerState == ExecutionState.FINISHED)) {
+
+				final Instance partitionInstance = producerSlot.getInstance();
+
+				if (partitionInstance.equals(consumerSlot.getInstance())) {
+					// Consuming task is deployed to the same instance as the partition => local
+					partitionLocation = ResultPartitionLocation.createLocal();
+				}
+				else {
+					// Different instances => remote
+					final ConnectionID connectionId = new ConnectionID(
+							partitionInstance.getInstanceConnectionInfo(),
+							consumedPartition.getIntermediateResult().getConnectionIndex());
+
+					partitionLocation = ResultPartitionLocation.createRemote(connectionId);
+				}
+			}
+			else {
+				// The producing task might not have registered the partition yet
+				partitionLocation = ResultPartitionLocation.createUnknown();
+			}
+
+			final ResultPartitionID consumedPartitionId = new ResultPartitionID(
+					consumedPartition.getPartitionId(), producer.getAttemptId());
+
+			icdd[i] = new InputChannelDeploymentDescriptor(
+					consumedPartitionId, partitionLocation);
+		}
+
+		LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
+
+		return icdd;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
new file mode 100644
index 0000000..77b072a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Deployment descriptor for a single input gate instance.
+ *
+ * <p> Each input gate consumes partitions of a single intermediate result. The consumed
+ * subpartition index is the same for each consumed partition.
+ *
+ * @see SingleInputGate
+ */
+public class InputGateDeploymentDescriptor implements Serializable {
+
+	/**
+	 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
+	 * intermediate result specified by this ID. This ID also identifies the input gate at the
+	 * consuming task.
+	 */
+	private final IntermediateDataSetID consumedResultId;
+
+	/**
+	 * The index of the consumed subpartition of each consumed partition. This index depends on the
+	 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
+	 */
+	private final int consumedSubpartitionIndex;
+
+	/** An input channel for each consumed subpartition. */
+	private final InputChannelDeploymentDescriptor[] inputChannels;
+
+	public InputGateDeploymentDescriptor(
+			IntermediateDataSetID consumedResultId,
+			int consumedSubpartitionIndex,
+			InputChannelDeploymentDescriptor[] inputChannels) {
+
+		this.consumedResultId = checkNotNull(consumedResultId);
+
+		checkArgument(consumedSubpartitionIndex >= 0);
+		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
+
+		this.inputChannels = checkNotNull(inputChannels);
+	}
+
+	public IntermediateDataSetID getConsumedResultId() {
+		return consumedResultId;
+	}
+
+	public int getConsumedSubpartitionIndex() {
+		return consumedSubpartitionIndex;
+	}
+
+	public InputChannelDeploymentDescriptor[] getInputChannelDeploymentDescriptors() {
+		return inputChannels;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("InputGateDeploymentDescriptor [result id: %s, " +
+						"consumed subpartition index: %d, input channels: %s]",
+				consumedResultId.toShortString(), consumedSubpartitionIndex,
+				Arrays.toString(inputChannels));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
new file mode 100644
index 0000000..adea18d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Partial deployment descriptor for a single input channel instance.
+ *
+ * <p> This deployment descriptor is created in {@link Execution#scheduleOrUpdateConsumers(java.util.List)},
+ * if the consumer instance is not yet clear. Once the instance on which the consumer runs is known,
+ * the deployment descriptor is updated by completing the partition location.
+ */
+public class PartialInputChannelDeploymentDescriptor {
+
+	/** The result ID identifies the input gate to update. */
+	private final IntermediateDataSetID resultId;
+
+	/** The partition ID identifies the input channel to update. */
+	private final ResultPartitionID partitionID;
+
+	/** The partition connection info. */
+	private final InstanceConnectionInfo partitionConnectionInfo;
+
+	/** The partition connection index. */
+	private final int partitionConnectionIndex;
+
+	public PartialInputChannelDeploymentDescriptor(
+			IntermediateDataSetID resultId,
+			ResultPartitionID partitionID,
+			InstanceConnectionInfo partitionConnectionInfo,
+			int partitionConnectionIndex) {
+
+		this.resultId = checkNotNull(resultId);
+		this.partitionID = checkNotNull(partitionID);
+		this.partitionConnectionInfo = checkNotNull(partitionConnectionInfo);
+		this.partitionConnectionIndex = partitionConnectionIndex;
+	}
+
+	/**
+	 * Creates a channel deployment descriptor by completing the partition location.
+	 *
+	 * @see InputChannelDeploymentDescriptor
+	 */
+	public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(
+			Execution consumerExecution) {
+
+		checkNotNull(consumerExecution, "Consumer execution null");
+
+		InstanceConnectionInfo consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
+
+		checkNotNull(consumerConnectionInfo, "Consumer connection info null");
+
+		final ResultPartitionLocation partitionLocation;
+
+		if (consumerConnectionInfo.equals(partitionConnectionInfo)) {
+			partitionLocation = ResultPartitionLocation.createLocal();
+		}
+		else {
+			partitionLocation = ResultPartitionLocation.createRemote(
+					new ConnectionID(partitionConnectionInfo, partitionConnectionIndex));
+		}
+
+		return new InputChannelDeploymentDescriptor(partitionID, partitionLocation);
+	}
+
+	public IntermediateDataSetID getResultId() {
+		return resultId;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a partial input channel for the given partition and producing task.
+	 */
+	public static PartialInputChannelDeploymentDescriptor fromEdge(
+			IntermediateResultPartition partition,
+			Execution producer) {
+
+		final ResultPartitionID partitionId = new ResultPartitionID(
+				partition.getPartitionId(), producer.getAttemptId());
+
+		final IntermediateResult result = partition.getIntermediateResult();
+
+		final IntermediateDataSetID resultId = result.getId();
+		final InstanceConnectionInfo partitionConnectionInfo = producer.getAssignedResourceLocation();
+		final int partitionConnectionIndex = result.getConnectionIndex();
+
+		return new PartialInputChannelDeploymentDescriptor(
+				resultId, partitionId, partitionConnectionInfo, partitionConnectionIndex);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
deleted file mode 100644
index a27c976..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.deployment;
-
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.io.network.RemoteAddress;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-/**
- * This class contains the partial partition info which is created if the consumer instance is not
- * yet clear. Once the instance on which the consumer runs is known, the complete partition info
- * can be computed.
- */
-public class PartialPartitionInfo {
-	private final IntermediateDataSetID intermediateDataSetID;
-
-	private final IntermediateResultPartitionID partitionID;
-
-	private final ExecutionAttemptID producerExecutionID;
-
-	private final InstanceConnectionInfo producerInstanceConnectionInfo;
-
-	private final int partitionConnectionIndex;
-
-	public PartialPartitionInfo(IntermediateDataSetID intermediateDataSetID,
-								IntermediateResultPartitionID partitionID,
-								ExecutionAttemptID executionID,
-								InstanceConnectionInfo producerInstanceConnectionInfo,
-								int partitionConnectionIndex) {
-		this.intermediateDataSetID = intermediateDataSetID;
-		this.partitionID = partitionID;
-		this.producerExecutionID = executionID;
-		this.producerInstanceConnectionInfo = producerInstanceConnectionInfo;
-		this.partitionConnectionIndex = partitionConnectionIndex;
-	}
-
-	public PartitionInfo createPartitionInfo(Execution consumerExecution) throws IllegalStateException {
-		if(consumerExecution != null){
-			PartitionInfo.PartitionLocation producerLocation;
-
-			RemoteAddress resolvedProducerAddress;
-
-			if(consumerExecution.getAssignedResourceLocation().equals(
-					producerInstanceConnectionInfo)) {
-				resolvedProducerAddress = null;
-				producerLocation = PartitionInfo.PartitionLocation.LOCAL;
-			} else {
-				resolvedProducerAddress = new RemoteAddress(producerInstanceConnectionInfo,
-						partitionConnectionIndex);
-
-				producerLocation = PartitionInfo.PartitionLocation.REMOTE;
-			}
-
-			return new PartitionInfo(partitionID, producerExecutionID, producerLocation,
-					resolvedProducerAddress);
-
-		} else {
-			throw new RuntimeException("Cannot create partition info, because consumer execution " +
-					"is null.");
-		}
-	}
-
-	public IntermediateDataSetID getIntermediateDataSetID() {
-		return intermediateDataSetID;
-	}
-
-	public static PartialPartitionInfo fromEdge(final ExecutionEdge edge){
-		IntermediateResultPartition partition = edge.getSource();
-		IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId();
-
-		IntermediateDataSetID intermediateDataSetID = partition.getIntermediateResult().getId();
-
-		Execution producer = partition.getProducer().getCurrentExecutionAttempt();
-		ExecutionAttemptID producerExecutionID = producer.getAttemptId();
-
-		return new PartialPartitionInfo(intermediateDataSetID, partitionID, producerExecutionID,
-				producer.getAssignedResourceLocation(),
-				partition.getIntermediateResult().getConnectionIndex());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java
deleted file mode 100644
index 7300da4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.deployment;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-/**
- * A partition consumer deployment descriptor combines information of all partitions, which are
- * consumed by a single reader.
- */
-public class PartitionConsumerDeploymentDescriptor implements IOReadableWritable, Serializable {
-
-	private IntermediateDataSetID resultId;
-
-	private PartitionInfo[] partitions;
-
-	private int queueIndex;
-
-	public PartitionConsumerDeploymentDescriptor() {
-	}
-
-	public PartitionConsumerDeploymentDescriptor(IntermediateDataSetID resultId, PartitionInfo[] partitions, int queueIndex) {
-		this.resultId = resultId;
-		this.partitions = partitions;
-		this.queueIndex = queueIndex;
-	}
-
-	// ------------------------------------------------------------------------
-	// Properties
-	// ------------------------------------------------------------------------
-
-	public IntermediateDataSetID getResultId() {
-		return resultId;
-	}
-
-	public PartitionInfo[] getPartitions() {
-		return partitions;
-	}
-
-	public int getQueueIndex() {
-		return queueIndex;
-	}
-
-	// ------------------------------------------------------------------------
-	// Serialization
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		resultId.write(out);
-		out.writeInt(partitions.length);
-		for (PartitionInfo partition : partitions) {
-			partition.write(out);
-		}
-
-		out.writeInt(queueIndex);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		resultId = new IntermediateDataSetID();
-		resultId.read(in);
-
-		partitions = new PartitionInfo[in.readInt()];
-		for (int i = 0; i < partitions.length; i++) {
-			partitions[i] = new PartitionInfo();
-			partitions[i].read(in);
-		}
-
-		this.queueIndex = in.readInt();
-	}
-
-	@Override
-	public String toString() {
-		return String.format("PartitionConsumerDeploymentDescriptor(ResultID: %s, " +
-				"Queue index: %d, Partitions: %s)", resultId, queueIndex,
-				Arrays.toString(partitions));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java
deleted file mode 100644
index 37651c9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.deployment;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A partition deployment descriptor combines information for a produced intermediate result
- * partition.
- */
-public class PartitionDeploymentDescriptor implements IOReadableWritable, Serializable {
-
-	private final IntermediateDataSetID resultId;
-
-	private final IntermediateResultPartitionID partitionId;
-
-	private IntermediateResultPartitionType partitionType;
-
-	private int numberOfQueues;
-
-	public PartitionDeploymentDescriptor() {
-		this.resultId = new IntermediateDataSetID();
-		this.partitionId = new IntermediateResultPartitionID();
-		this.numberOfQueues = -1;
-	}
-
-	public PartitionDeploymentDescriptor(IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, IntermediateResultPartitionType partitionType, int numberOfQueues) {
-		this.resultId = resultId;
-		this.partitionId = partitionId;
-		this.partitionType = partitionType;
-		this.numberOfQueues = numberOfQueues;
-	}
-
-	// ------------------------------------------------------------------------
-	// Properties
-	// ------------------------------------------------------------------------
-
-	public IntermediateDataSetID getResultId() {
-		return resultId;
-	}
-
-	public IntermediateResultPartitionID getPartitionId() {
-		return partitionId;
-	}
-
-	public IntermediateResultPartitionType getPartitionType() {
-		return partitionType;
-	}
-
-	public int getNumberOfQueues() {
-		return numberOfQueues;
-	}
-
-	// ------------------------------------------------------------------------
-	// Serialization
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		resultId.write(out);
-		partitionId.write(out);
-		out.writeInt(partitionType.ordinal());
-		out.writeInt(numberOfQueues);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		resultId.read(in);
-		partitionId.read(in);
-		partitionType = IntermediateResultPartitionType.values()[in.readInt()];
-		numberOfQueues = in.readInt();
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static PartitionDeploymentDescriptor fromIntermediateResultPartition(IntermediateResultPartition partition) {
-
-		IntermediateResultPartitionID partitionId = partition.getPartitionId();
-
-		// The produced data is partitioned at runtime among a number of queues.
-		// If no consumers are known at this point, we use a single queue,
-		// otherwise we have a queue for each consumer sub task.
-		int numberOfQueues = 1;
-
-		if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {
-			numberOfQueues = partition.getConsumers().get(0).size();
-		}
-
-		return new PartitionDeploymentDescriptor(partition.getIntermediateResult().getId(), partitionId, partition.getIntermediateResult().getResultType(), numberOfQueues);
-	}
-
-	@Override
-	public String toString() {
-		return String.format("PartitionDeploymentDescriptor(ResultID: %s, partitionID: %s, " +
-				"Partition type: %s)", resultId, partitionId, partitionType);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
deleted file mode 100644
index 6a30853..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.deployment;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.io.network.RemoteAddress;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A partition info instance contains all information necessary for a reader to create an input
- * channel to request a partition at runtime.
- */
-public class PartitionInfo implements IOReadableWritable, Serializable {
-
-	private static Logger LOG = LoggerFactory.getLogger(PartitionInfo.class);
-
-	public enum PartitionLocation {
-		LOCAL, REMOTE, UNKNOWN
-	}
-
-	private final IntermediateResultPartitionID partitionId;
-
-	private ExecutionAttemptID producerExecutionId;
-
-	private PartitionLocation producerLocation;
-
-	private RemoteAddress producerAddress; // != null, iff known remote producer
-
-	public PartitionInfo(IntermediateResultPartitionID partitionId, ExecutionAttemptID producerExecutionId, PartitionLocation producerLocation, RemoteAddress producerAddress) {
-		this.partitionId = checkNotNull(partitionId);
-		this.producerExecutionId = checkNotNull(producerExecutionId);
-		this.producerLocation = checkNotNull(producerLocation);
-		this.producerAddress = producerAddress;
-	}
-
-	public PartitionInfo() {
-		this.partitionId = new IntermediateResultPartitionID();
-		this.producerExecutionId = new ExecutionAttemptID();
-		this.producerLocation = PartitionLocation.UNKNOWN;
-		this.producerAddress = null;
-	}
-
-	// ------------------------------------------------------------------------
-	// Properties
-	// ------------------------------------------------------------------------
-
-	public IntermediateResultPartitionID getPartitionId() {
-		return partitionId;
-	}
-
-	public ExecutionAttemptID getProducerExecutionId() {
-		return producerExecutionId;
-	}
-
-	public PartitionLocation getProducerLocation() {
-		return producerLocation;
-	}
-
-	public RemoteAddress getProducerAddress() {
-		return producerAddress;
-	}
-
-	// ------------------------------------------------------------------------
-	// Serialization
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		partitionId.write(out);
-		producerExecutionId.write(out);
-		out.writeInt(producerLocation.ordinal());
-		if (producerLocation == PartitionLocation.REMOTE) {
-			producerAddress.write(out);
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		partitionId.read(in);
-		producerExecutionId.read(in);
-		producerLocation = PartitionLocation.values()[in.readInt()];
-		if (producerLocation == PartitionLocation.REMOTE) {
-			producerAddress = new RemoteAddress();
-			producerAddress.read(in);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static PartitionInfo fromEdge(ExecutionEdge edge, SimpleSlot consumerSlot) {
-		IntermediateResultPartition partition = edge.getSource();
-		IntermediateResultPartitionID partitionId = partition.getPartitionId();
-
-		// Intermediate result partition producer
-		Execution producer = partition.getProducer().getCurrentExecutionAttempt();
-
-		ExecutionAttemptID producerExecutionId = producer.getAttemptId();
-		RemoteAddress producerAddress = null;
-		PartitionLocation producerLocation = PartitionLocation.UNKNOWN;
-
-		SimpleSlot producerSlot = producer.getAssignedResource();
-		ExecutionState producerState = producer.getState();
-
-		// The producer needs to be running, otherwise the consumer might request a partition,
-		// which has not been registered yet.
-		if (producerSlot != null && (producerState == ExecutionState.RUNNING ||
-			producerState == ExecutionState.FINISHED)) {
-			if (producerSlot.getInstance().equals(consumerSlot.getInstance())) {
-				producerLocation = PartitionLocation.LOCAL;
-			}
-			else {
-				producerAddress = new RemoteAddress(producerSlot.getInstance().getInstanceConnectionInfo(),
-						partition.getIntermediateResult().getConnectionIndex());
-
-				producerLocation = PartitionLocation.REMOTE;
-			}
-		}
-
-		PartitionInfo partitionInfo = new PartitionInfo(partitionId, producerExecutionId,
-				producerLocation, producerAddress);
-
-		LOG.debug("Create partition info {}.", partitionInfo);
-
-		return partitionInfo;
-	}
-
-	public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, SimpleSlot consumerSlot) {
-		// Every edge consumes a different result partition, which might be of
-		// local, remote, or unknown location.
-		PartitionInfo[] partitions = new PartitionInfo[edges.length];
-
-		for (int i = 0; i < edges.length; i++) {
-			partitions[i] = fromEdge(edges[i], consumerSlot);
-		}
-
-		return partitions;
-	}
-
-	@Override
-	public String toString() {
-		return String.format("PartitionInfo(PartitionID: %s, ProducerID: %s, " +
-				"ProducerLocation: %s, ProducerAddress: %s)", partitionId, producerExecutionId,
-				producerLocation, producerAddress);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
new file mode 100644
index 0000000..4a88f18
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Deployment descriptor for a result partition.
+ *
+ * @see ResultPartition
+ */
+public class ResultPartitionDeploymentDescriptor implements Serializable {
+
+	/** The ID of the result this partition belongs to. */
+	private final IntermediateDataSetID resultId;
+
+	/** The ID of the partition. */
+	private final IntermediateResultPartitionID partitionId;
+
+	/** The type of the partition. */
+	private final ResultPartitionType partitionType;
+
+	/** The number of subpartitions. */
+	private final int numberOfSubpartitions;
+
+	public ResultPartitionDeploymentDescriptor(
+			IntermediateDataSetID resultId,
+			IntermediateResultPartitionID partitionId,
+			ResultPartitionType partitionType,
+			int numberOfSubpartitions) {
+
+		this.resultId = checkNotNull(resultId);
+		this.partitionId = checkNotNull(partitionId);
+		this.partitionType = checkNotNull(partitionType);
+
+		checkArgument(numberOfSubpartitions >= 1);
+		this.numberOfSubpartitions = numberOfSubpartitions;
+	}
+
+	public IntermediateDataSetID getResultId() {
+		return resultId;
+	}
+
+	public IntermediateResultPartitionID getPartitionId() {
+		return partitionId;
+	}
+
+	public ResultPartitionType getPartitionType() {
+		return partitionType;
+	}
+
+	public int getNumberOfSubpartitions() {
+		return numberOfSubpartitions;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("ResultPartitionDeploymentDescriptor [result id: %s," +
+						"partition id: %s,partition type: %s]",
+				resultId, partitionId, partitionType);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition) {
+
+		final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
+		final IntermediateResultPartitionID partitionId = partition.getPartitionId();
+		final ResultPartitionType partitionType = partition.getIntermediateResult().getResultType();
+
+		// The produced data is partitioned among a number of subpartitions.
+		//
+		// If no consumers are known at this point, we use a single subpartition, otherwise we have
+		// one for each consuming sub task.
+		int numberOfSubpartitions = 1;
+
+		if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {
+
+			if (partition.getConsumers().size() > 1) {
+				new IllegalStateException("Currently, only a single consumer group per partition is supported.");
+			}
+
+			numberOfSubpartitions = partition.getConsumers().get(0).size();
+		}
+
+		return new ResultPartitionDeploymentDescriptor(
+				resultId, partitionId, partitionType, numberOfSubpartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
new file mode 100644
index 0000000..3922c22
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Location of a result partition from the perspective of the consuming task.
+ *
+ * <p> The location indicates both the instance, on which the partition is produced and the state of
+ * the producing task. There are three possibilities:
+ *
+ * <ol>
+ * <li><strong>Local:</strong> The partition is available at the same instance on which the
+ * consuming task is (being) deployed and the producing task has registered the result partition.
+ *
+ * <li><strong>Remote:</strong> The result partition is available at a different instance from the
+ * one, on which the consuming task is (being) deployed and the producing task has registered the
+ * result partition.
+ *
+ * <li><strong>Unknown:</strong> The producing task has not yet registered the result partition.
+ * When deploying the consuming task, the instance might be known or unknown. In any case, the
+ * consuming task cannot request it yet. Instead, it will be updated at runtime after the
+ * producing task is guaranteed to have registered the partition. A producing task is guaranteed
+ * to have registered the partition after its state has switched to running.
+ * </ol>
+ */
+public class ResultPartitionLocation implements Serializable {
+
+	/** The type of location for the result partition. */
+	private final LocationType locationType;
+
+	/** The connection ID of a remote result partition. */
+	private final ConnectionID connectionId;
+
+	private enum LocationType {
+		LOCAL,
+		REMOTE,
+		UNKNOWN
+	}
+
+	private ResultPartitionLocation(LocationType locationType, ConnectionID connectionId) {
+		this.locationType = checkNotNull(locationType);
+		this.connectionId = connectionId;
+	}
+
+	public static ResultPartitionLocation createRemote(ConnectionID connectionId) {
+		return new ResultPartitionLocation(LocationType.REMOTE, checkNotNull(connectionId));
+	}
+
+	public static ResultPartitionLocation createLocal() {
+		return new ResultPartitionLocation(LocationType.LOCAL, null);
+	}
+
+	public static ResultPartitionLocation createUnknown() {
+		return new ResultPartitionLocation(LocationType.UNKNOWN, null);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public boolean isLocal() {
+		return locationType == LocationType.LOCAL;
+	}
+
+	public boolean isRemote() {
+		return locationType == LocationType.REMOTE;
+	}
+
+	public boolean isUnknown() {
+		return locationType == LocationType.UNKNOWN;
+	}
+
+	public ConnectionID getConnectionId() {
+		return connectionId;
+	}
+
+	@Override
+	public String toString() {
+		return "ResultPartitionLocation [" + locationType + (isRemote() ? " [" + connectionId + "]]" : "]");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 6993248..f7518bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -49,47 +49,47 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	private final ExecutionAttemptID executionId;
 
 	/** The task's name. */
-	private String taskName;
+	private final String taskName;
 
 	/** The task's index in the subtask group. */
-	private int indexInSubtaskGroup;
+	private final int indexInSubtaskGroup;
 
 	/** The number of sub tasks. */
-	private int numberOfSubtasks;
+	private final int numberOfSubtasks;
 
 	/** The configuration of the job the task belongs to. */
-	private Configuration jobConfiguration;
+	private final Configuration jobConfiguration;
 
 	/** The task's configuration object. */
-	private Configuration taskConfiguration;
+	private final Configuration taskConfiguration;
 
 	/** The name of the class containing the task code to be executed. */
-	private String invokableClassName;
-
+	private final String invokableClassName;
 
 	/** The list of produced intermediate result partition deployment descriptors. */
-	private List<PartitionDeploymentDescriptor> producedPartitions;
+	private final List<ResultPartitionDeploymentDescriptor> producedPartitions;
 
 	/** The list of consumed intermediate result partitions. */
-	private List<PartitionConsumerDeploymentDescriptor> consumedPartitions;
+	private final List<InputGateDeploymentDescriptor> inputGates;
 
-	private int targetSlotNumber;
+	private final int targetSlotNumber;
 
 	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
-	
+
 	private StateHandle operatorStates;
 
+
 	/**
 	 * Constructs a task deployment descriptor.
 	 */
 	public TaskDeploymentDescriptor(
-			JobID jobID, JobVertexID vertexID,  ExecutionAttemptID executionId,  String taskName,
-			int indexInSubtaskGroup,  int numberOfSubtasks, Configuration jobConfiguration,
+			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName,
+			int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration,
 			Configuration taskConfiguration, String invokableClassName,
-			List<PartitionDeploymentDescriptor> producedPartitions,
-			List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber){
+			List<ResultPartitionDeploymentDescriptor> producedPartitions,
+			List<InputGateDeploymentDescriptor> inputGates,
+			List<BlobKey> requiredJarFiles, int targetSlotNumber) {
 
 		this.jobID = checkNotNull(jobID);
 		this.vertexID = checkNotNull(vertexID);
@@ -103,37 +103,25 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.taskConfiguration = checkNotNull(taskConfiguration);
 		this.invokableClassName = checkNotNull(invokableClassName);
 		this.producedPartitions = checkNotNull(producedPartitions);
-		this.consumedPartitions = checkNotNull(consumedPartitions);
+		this.inputGates = checkNotNull(inputGates);
 		this.requiredJarFiles = checkNotNull(requiredJarFiles);
+		checkArgument(targetSlotNumber >= 0);
 		this.targetSlotNumber = targetSlotNumber;
 	}
 
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public TaskDeploymentDescriptor() {
-		this.jobID = new JobID();
-		this.vertexID = new JobVertexID();
-		this.executionId = new ExecutionAttemptID();
-		this.jobConfiguration = new Configuration();
-		this.taskConfiguration = new Configuration();
-		this.producedPartitions = new ArrayList<PartitionDeploymentDescriptor>();
-		this.consumedPartitions = new ArrayList<PartitionConsumerDeploymentDescriptor>();
-		this.requiredJarFiles = new ArrayList<BlobKey>();
-	}
-
 	public TaskDeploymentDescriptor(
 			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName,
 			int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration,
 			Configuration taskConfiguration, String invokableClassName,
-			List<PartitionDeploymentDescriptor> producedPartitions,
-			List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber, StateHandle operatorStates) {
+			List<ResultPartitionDeploymentDescriptor> producedPartitions,
+			List<InputGateDeploymentDescriptor> inputGates,
+			List<BlobKey> requiredJarFiles, int targetSlotNumber,
+			StateHandle operatorStates) {
 
 		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
-				consumedPartitions, requiredJarFiles, targetSlotNumber);
-		
+				inputGates, requiredJarFiles, targetSlotNumber);
+
 		setOperatorState(operatorStates);
 	}
 
@@ -164,7 +152,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	/**
 	 * Returns the task's index in the subtask group.
-	 * 
+	 *
 	 * @return the task's index in the subtask group
 	 */
 	public int getIndexInSubtaskGroup() {
@@ -177,10 +165,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	public int getNumberOfSubtasks() {
 		return numberOfSubtasks;
 	}
-	
+
 	/**
 	 * Gets the number of the slot into which the task is to be deployed.
-	 * 
+	 *
 	 * @return The number of the target slot.
 	 */
 	public int getTargetSlotNumber() {
@@ -208,12 +196,12 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return invokableClassName;
 	}
 
-	public List<PartitionDeploymentDescriptor> getProducedPartitions() {
+	public List<ResultPartitionDeploymentDescriptor> getProducedPartitions() {
 		return producedPartitions;
 	}
 
-	public List<PartitionConsumerDeploymentDescriptor> getConsumedPartitions() {
-		return consumedPartitions;
+	public List<InputGateDeploymentDescriptor> getInputGates() {
+		return inputGates;
 	}
 
 	public List<BlobKey> getRequiredJarFiles() {
@@ -222,25 +210,26 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	@Override
 	public String toString() {
-		final StringBuilder pddBuilder = new StringBuilder("");
-		final StringBuilder pcddBuilder = new StringBuilder("");
+		return String.format("TaskDeploymentDescriptor [job id: %s, job vertex id: %s, " +
+						"execution id: %s, task name: %s (%d/%d), invokable: %s, " +
+						"produced partitions: %s, input gates: %s]",
+				jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
+				invokableClassName, collectionToString(producedPartitions),
+				collectionToString(inputGates));
+	}
 
-		for(PartitionDeploymentDescriptor pdd: producedPartitions) {
-			pddBuilder.append(pdd);
-		}
+	private String collectionToString(Collection<?> collection) {
+		final StringBuilder strBuilder = new StringBuilder();
+
+		strBuilder.append("[");
 
-		for(PartitionConsumerDeploymentDescriptor pcdd: consumedPartitions) {
-			pcddBuilder.append(pcdd);
+		for (Object elem : collection) {
+			strBuilder.append(elem.toString());
 		}
 
-		final String strProducedPartitions = pddBuilder.toString();
-		final String strConsumedPartitions = pcddBuilder.toString();
+		strBuilder.append("]");
 
-		return String.format("TaskDeploymentDescriptor(JobID: %s, JobVertexID: %s, " +
-				"ExecutionID: %s, Task name: %s, (%d/%d), Invokable: %s, " +
-				"Produced partitions: %s, Consumed partitions: %s", jobID, vertexID, executionId,
-				taskName, indexInSubtaskGroup, numberOfSubtasks, invokableClassName,
-				strProducedPartitions, strConsumedPartitions);
+		return strBuilder.toString();
 	}
 
 	public void setOperatorState(StateHandle operatorStates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index fc2d2c8..556bb11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -130,9 +130,9 @@ public interface Environment {
 
 	BroadcastVariableManager getBroadcastVariableManager();
 
-	BufferWriter getWriter(int index);
+	ResultPartitionWriter getWriter(int index);
 
-	BufferWriter[] getAllWriters();
+	ResultPartitionWriter[] getAllWriters();
 
 	InputGate getInputGate(int index);
 


Mime
View raw message