flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [22/63] [abbrv] git commit: Unify all job vertices to one type (rather than dedicated input/output types)
Date Sun, 21 Sep 2014 02:12:46 GMT
Unify all job vertices to one type (rather than dedicated input/output types)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cb7039e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cb7039e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cb7039e3

Branch: refs/heads/master
Commit: cb7039e3e171474a7635a73dda9c086c84966dd0
Parents: e6aadfc
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Jun 22 19:05:02 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |  25 ++-
 .../jobgraph/AbstractJobOutputVertex.java       |   2 -
 .../runtime/jobgraph/AbstractJobVertex.java     |  38 +----
 .../jobgraph/InputFormatInputVertex.java        | 103 ++++++++++++
 .../apache/flink/runtime/jobgraph/JobEdge.java  |   2 -
 .../apache/flink/runtime/jobgraph/JobGraph.java | 162 +++----------------
 .../flink/runtime/jobgraph/JobInputVertex.java  | 103 ------------
 .../flink/runtime/jobgraph/JobOutputVertex.java |  84 ----------
 .../jobgraph/OutputFormatOutputVertex.java      |  83 ++++++++++
 .../runtime/jobgraph/SimpleInputVertex.java     |  61 +++++++
 .../runtime/jobgraph/SimpleOutputVertex.java    |  53 ++++++
 .../jobgraph/tasks/AbstractInvokable.java       |  12 +-
 .../executiongraph/SelfCrossForwardTask.java    |   2 -
 .../BroadcastVarsNepheleITCase.java             |  22 +--
 .../KMeansIterativeNepheleITCase.java           |  31 ++--
 .../ConnectedComponentsNepheleITCase.java       |  67 ++++----
 .../IterationWithChainingNepheleITCase.java     |  13 +-
 .../test/iterative/nephele/JobGraphUtils.java   |  23 +--
 .../CustomCompensatableDanglingPageRank.java    |  16 +-
 ...mpensatableDanglingPageRankWithCombiner.java |  17 +-
 .../CompensatableDanglingPageRank.java          |  15 +-
 .../test/runtime/NetworkStackThroughput.java    |   8 +-
 22 files changed, 449 insertions(+), 493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 043a0a7..cb912de 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -66,11 +66,12 @@ import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.AbstractJobOutputVertex;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
@@ -808,8 +809,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		return vertex;
 	}
 
-	private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
-		final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph);
+	private AbstractJobVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
+		final InputFormatInputVertex vertex = new InputFormatInputVertex(node.getNodeName(), this.jobGraph);
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSourceTask.class);
@@ -823,7 +824,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	}
 
 	private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
-		final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph);
+		final OutputFormatOutputVertex vertex = new OutputFormatOutputVertex(node.getNodeName(), this.jobGraph);
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSinkTask.class);
@@ -1138,8 +1139,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
 		
 		// --------------------------- create the sync task ---------------------------
-		final JobOutputVertex sync = new JobOutputVertex("Sync(" +
-					bulkNode.getNodeName() + ")", this.jobGraph);
+		final SimpleOutputVertex sync = new SimpleOutputVertex("Sync(" + bulkNode.getNodeName() + ")", this.jobGraph);
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setNumberOfSubtasks(1);
 		this.auxVertices.add(sync);
@@ -1194,7 +1194,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			
 			// create the fake output task
-			JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
+			SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph);
 			fakeTail.setInvokableClass(FakeOutputTask.class);
 			fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 			this.auxVertices.add(fakeTail);
@@ -1236,7 +1236,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
 			tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			
-			JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
+			SimpleOutputVertex fakeTailTerminationCriterion = new SimpleOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
 			fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
 			fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 			this.auxVertices.add(fakeTailTerminationCriterion);
@@ -1303,8 +1303,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		// --------------------------- create the sync task ---------------------------
 		final TaskConfig syncConfig;
 		{
-			final JobOutputVertex sync = new JobOutputVertex("Sync (" +
-						iterNode.getNodeName() + ")", this.jobGraph);
+			final SimpleOutputVertex sync = new SimpleOutputVertex("Sync (" + iterNode.getNodeName() + ")", this.jobGraph);
 			sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 			sync.setNumberOfSubtasks(1);
 			this.auxVertices.add(sync);
@@ -1369,7 +1368,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 					
 					// create the fake output task
-					JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
+					SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph);
 					fakeTail.setInvokableClass(FakeOutputTask.class);
 					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 					this.auxVertices.add(fakeTail);
@@ -1407,7 +1406,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 	
 					// create the fake output task
-					JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
+					SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph);
 					fakeTail.setInvokableClass(FakeOutputTask.class);
 					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
 					this.auxVertices.add(fakeTail);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java
index c1f0ec5..edb8d0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobgraph;
 
 /**
  * An abstract base class for output vertices in Nephele.
- * 
  */
 public abstract class AbstractJobOutputVertex extends AbstractJobVertex {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 08a9567..7df76c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -72,14 +72,9 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	private int numberOfSubtasks = -1;
 
 	/**
-	 * Number of retries in case of an error before the task represented by this vertex is considered as failed.
-	 */
-	private int numberOfExecutionRetries = -1;
-
-	/**
 	 * Other task to share a (set of) of instances with at runtime.
 	 */
-	private AbstractJobVertex vertexToShareInstancesWith = null;
+	private AbstractJobVertex vertexToShareInstancesWith;
 
 	/**
 	 * Custom configuration passed to the assigned task at runtime.
@@ -89,7 +84,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	/**
 	 * The class of the invokable.
 	 */
-	protected Class<? extends AbstractInvokable> invokableClass = null;
+	protected Class<? extends AbstractInvokable> invokableClass;
 
 	
 	/**
@@ -388,9 +383,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 		// Read number of subtasks
 		this.numberOfSubtasks = in.readInt();
 
-		// Number of execution retries
-		this.numberOfExecutionRetries = in.readInt();
-
 		// Read vertex to share instances with
 		if (in.readBoolean()) {
 			final JobVertexID id = new JobVertexID();
@@ -464,9 +456,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 		// Number of subtasks
 		out.writeInt(this.numberOfSubtasks);
 
-		// Number of execution retries
-		out.writeInt(this.numberOfExecutionRetries);
-
 		// Vertex to share instance with
 		if (this.vertexToShareInstancesWith != null) {
 			out.writeBoolean(true);
@@ -538,29 +527,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	}
 
 	/**
-	 * Sets the number of retries in case of an error before the task represented by this vertex is considered as
-	 * failed.
-	 * 
-	 * @param numberOfExecutionRetries
-	 *        the number of retries in case of an error before the task represented by this vertex is considered as
-	 *        failed
-	 */
-	public void setNumberOfExecutionRetries(final int numberOfExecutionRetries) {
-		this.numberOfExecutionRetries = numberOfExecutionRetries;
-	}
-
-	/**
-	 * Returns the number of retries in case of an error before the task represented by this vertex is considered as
-	 * failed.
-	 * 
-	 * @return the number of retries in case of an error before the task represented by this vertex is considered as
-	 *         failed or <code>-1</code> if unspecified
-	 */
-	public int getNumberOfExecutionRetries() {
-		return this.numberOfExecutionRetries;
-	}
-
-	/**
 	 * Sets the vertex this vertex should share its instances with at runtime.
 	 * 
 	 * @param vertex

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java
new file mode 100644
index 0000000..f79264a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+
+
+public class InputFormatInputVertex extends AbstractJobInputVertex {
+
+	private InputFormat<?, ?> inputFormat;
+	
+	public InputFormatInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
+	}
+	
+	/**
+	 * Creates a new job file input vertex with the specified name.
+	 * 
+	 * @param name
+	 *        The name of the new job file input vertex.
+	 * @param jobGraph
+	 *        The job graph this vertex belongs to.
+	 */
+	public InputFormatInputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+
+	/**
+	 * Creates a new job file input vertex.
+	 * 
+	 * @param jobGraph
+	 *        The job graph this vertex belongs to.
+	 */
+	public InputFormatInputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
+	}
+	
+	public void setInputFormat(InputFormat<?, ?> format) {
+		this.inputFormat = format;
+	}
+	
+	public void initializeInputFormatFromTaskConfig(ClassLoader cl) {
+		TaskConfig cfg = new TaskConfig(getConfiguration());
+		
+		UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(cl);
+		
+		if (wrapper != null) {
+			this.inputFormat = wrapper.getUserCodeObject(InputFormat.class, cl);
+			this.inputFormat.configure(cfg.getStubParameters());
+		}
+	}
+
+	/**
+	 * Gets the input split type class
+	 *
+	 * @return Input split type class
+	 */
+	@Override
+	public Class<? extends InputSplit> getInputSplitType() {
+		if (inputFormat == null){
+			return InputSplit.class;
+		}
+
+		return inputFormat.getInputSplitType();
+	}
+
+	/**
+	 * Gets the input splits from the input format.
+	 *
+	 * @param minNumSplits Number of minimal input splits
+	 * @return Array of input splits
+	 * @throws IOException
+	 */
+	@Override
+	public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
+		if (inputFormat == null){
+			return null;
+		}
+
+		return inputFormat.createInputSplits(minNumSplits);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
index 0a5df3a..33b6576 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.runtime.io.network.channels.ChannelType;
@@ -24,7 +23,6 @@ import org.apache.flink.runtime.io.network.channels.ChannelType;
 /**
  * Objects of this class represent edges in the user's job graph.
  * The edges can be annotated by a specific channel and compression level.
- * 
  */
 public class JobEdge {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 2040c8e..48d858a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,24 +47,13 @@ import org.apache.flink.util.ClassUtils;
 /**
  * A job graph represents an entire job in Nephele. A job graph must consists at least of one job vertex
  * and must be acyclic.
- * 
  */
 public class JobGraph implements IOReadableWritable {
 
 	/**
-	 * List of input vertices included in this job graph.
-	 */
-	private Map<JobVertexID, AbstractJobInputVertex> inputVertices = new HashMap<JobVertexID, AbstractJobInputVertex>();
-
-	/**
-	 * List of output vertices included in this job graph.
-	 */
-	private Map<JobVertexID, AbstractJobOutputVertex> outputVertices = new HashMap<JobVertexID, AbstractJobOutputVertex>();
-
-	/**
 	 * List of task vertices included in this job graph.
 	 */
-	private Map<JobVertexID, JobTaskVertex> taskVertices = new HashMap<JobVertexID, JobTaskVertex>();
+	private Map<JobVertexID, AbstractJobVertex> taskVertices = new LinkedHashMap<JobVertexID, AbstractJobVertex>();
 
 	/**
 	 * ID of this job.
@@ -90,11 +80,8 @@ public class JobGraph implements IOReadableWritable {
 	 */
 	private static final int BUFFERSIZE = 8192;
 
-	/**
-	 * Buffer for array of reachable job vertices
-	 */
-	private volatile AbstractJobVertex[] bufferedAllReachableJobVertices = null;
-
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Constructs a new job graph with a random job ID.
 	 */
@@ -108,7 +95,7 @@ public class JobGraph implements IOReadableWritable {
 	 * @param jobName
 	 *        the name for this job graph
 	 */
-	public JobGraph(final String jobName) {
+	public JobGraph(String jobName) {
 		this();
 		this.jobName = jobName;
 	}
@@ -128,65 +115,27 @@ public class JobGraph implements IOReadableWritable {
 	 * @return the configuration object for this job, or <code>null</code> if it is not set
 	 */
 	public Configuration getJobConfiguration() {
-
 		return this.jobConfiguration;
 	}
 
 	/**
-	 * Adds a new input vertex to the job graph if it is not already included.
-	 * 
-	 * @param inputVertex
-	 *        the new input vertex to be added
-	 */
-	public void addVertex(AbstractJobInputVertex inputVertex) {
-		if (!inputVertices.containsKey(inputVertex.getID())) {
-			inputVertices.put(inputVertex.getID(), inputVertex);
-		}
-	}
-
-	/**
 	 * Adds a new task vertex to the job graph if it is not already included.
 	 * 
 	 * @param taskVertex
 	 *        the new task vertex to be added
 	 */
-	public void addVertex(JobTaskVertex taskVertex) {
-		if (!taskVertices.containsKey(taskVertex.getID())) {
-			taskVertices.put(taskVertex.getID(), taskVertex);
-		}
-	}
-
-	/**
-	 * Adds a new output vertex to the job graph if it is not already included.
-	 * 
-	 * @param outputVertex
-	 *        the new output vertex to be added
-	 */
-	public void addVertex(AbstractJobOutputVertex outputVertex) {
-		if (!outputVertices.containsKey(outputVertex.getID())) {
-			outputVertices.put(outputVertex.getID(), outputVertex);
+	public void addVertex(AbstractJobVertex vertex) {
+		final JobVertexID id = vertex.getID();
+		AbstractJobVertex previous = taskVertices.put(id, vertex);
+		
+		// if we had a prior association, restore and throw an exception
+		if (previous != null) {
+			taskVertices.put(id, vertex);
+			throw new IllegalArgumentException("The JobGraph already contains a vertex with that id.");
 		}
 	}
 
 	/**
-	 * Returns the number of input vertices registered with the job graph.
-	 * 
-	 * @return the number of input vertices registered with the job graph
-	 */
-	public int getNumberOfInputVertices() {
-		return this.inputVertices.size();
-	}
-
-	/**
-	 * Returns the number of output vertices registered with the job graph.
-	 * 
-	 * @return the number of output vertices registered with the job graph
-	 */
-	public int getNumberOfOutputVertices() {
-		return this.outputVertices.size();
-	}
-
-	/**
 	 * Returns the number of task vertices registered with the job graph.
 	 * 
 	 * @return the number of task vertices registered with the job graph
@@ -196,39 +145,12 @@ public class JobGraph implements IOReadableWritable {
 	}
 
 	/**
-	 * Returns an iterator to iterate all input vertices registered with the job graph.
-	 * 
-	 * @return an iterator to iterate all input vertices registered with the job graph
-	 */
-	public Iterator<AbstractJobInputVertex> getInputVertices() {
-
-		final Collection<AbstractJobInputVertex> coll = this.inputVertices.values();
-
-		return coll.iterator();
-	}
-
-	/**
-	 * Returns an iterator to iterate all output vertices registered with the job graph.
-	 * 
-	 * @return an iterator to iterate all output vertices registered with the job graph
-	 */
-	public Iterator<AbstractJobOutputVertex> getOutputVertices() {
-
-		final Collection<AbstractJobOutputVertex> coll = this.outputVertices.values();
-
-		return coll.iterator();
-	}
-
-	/**
-	 * Returns an iterator to iterate all task vertices registered with the job graph.
+	 * Returns an Iterable to iterate all vertices registered with the job graph.
 	 * 
-	 * @return an iterator to iterate all task vertices registered with the job graph
+	 * @return an Iterable to iterate all vertices registered with the job graph
 	 */
-	public Iterator<JobTaskVertex> getTaskVertices() {
-
-		final Collection<JobTaskVertex> coll = this.taskVertices.values();
-
-		return coll.iterator();
+	public Iterable<AbstractJobVertex> getTaskVertices() {
+		return this.taskVertices.values();
 	}
 
 	/**
@@ -237,35 +159,7 @@ public class JobGraph implements IOReadableWritable {
 	 * @return the number of all job vertices registered with this job graph
 	 */
 	public int getNumberOfVertices() {
-
-		return this.inputVertices.size() + this.outputVertices.size() + this.taskVertices.size();
-	}
-
-	/**
-	 * Returns an array of all job vertices than can be reached when traversing the job graph from the input vertices.
-	 * Each job vertex is contained only one time.
-	 * 
-	 * @return an array of all job vertices than can be reached when traversing the job graph from the input vertices
-	 */
-	public AbstractJobVertex[] getAllReachableJobVertices() {
-		if(bufferedAllReachableJobVertices == null){
-			final List<AbstractJobVertex> collector = new ArrayList<AbstractJobVertex>();
-			final HashSet<JobVertexID> visited = new HashSet<JobVertexID>();
-
-			final Iterator<AbstractJobInputVertex> inputs = getInputVertices();
-
-			while(inputs.hasNext()){
-				AbstractJobVertex vertex = inputs.next();
-
-				if(!visited.contains(vertex.getID())){
-					collectVertices(vertex, visited, collector);
-				}
-			}
-
-			bufferedAllReachableJobVertices = collector.toArray(new AbstractJobVertex[0]);
-		}
-
-		return bufferedAllReachableJobVertices;
+		return this.taskVertices.size();
 	}
 
 	/**
@@ -297,7 +191,8 @@ public class JobGraph implements IOReadableWritable {
 	 * @return an array of all job vertices that are registered with the job graph
 	 */
 	public AbstractJobVertex[] getAllJobVertices() {
-
+		return this.taskVertices.values().toArray(new AbstractJobVertex[this.taskVertices.size()]);
+		
 		int i = 0;
 		final AbstractJobVertex[] vertices = new AbstractJobVertex[inputVertices.size() + outputVertices.size()
 			+ taskVertices.size()];
@@ -337,21 +232,8 @@ public class JobGraph implements IOReadableWritable {
 	 *        the ID of the vertex to search for
 	 * @return the vertex with the matching ID or <code>null</code> if no vertex with such ID could be found
 	 */
-	public AbstractJobVertex findVertexByID(final JobVertexID id) {
-
-		if (this.inputVertices.containsKey(id)) {
-			return this.inputVertices.get(id);
-		}
-
-		if (this.outputVertices.containsKey(id)) {
-			return this.outputVertices.get(id);
-		}
-
-		if (this.taskVertices.containsKey(id)) {
-			return this.taskVertices.get(id);
-		}
-
-		return null;
+	public AbstractJobVertex findVertexByID(JobVertexID id) {
+		return this.taskVertices.get(id);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java
deleted file mode 100644
index bffb182..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobgraph;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-
-public class JobInputVertex extends AbstractJobInputVertex {
-
-	private InputFormat<?, ?> inputFormat;
-	
-	public JobInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-	
-	/**
-	 * Creates a new job file input vertex with the specified name.
-	 * 
-	 * @param name
-	 *        The name of the new job file input vertex.
-	 * @param jobGraph
-	 *        The job graph this vertex belongs to.
-	 */
-	public JobInputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        The job graph this vertex belongs to.
-	 */
-	public JobInputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}
-	
-	public void setInputFormat(InputFormat<?, ?> format) {
-		this.inputFormat = format;
-	}
-	
-	public void initializeInputFormatFromTaskConfig(ClassLoader cl) {
-		TaskConfig cfg = new TaskConfig(getConfiguration());
-		
-		UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(cl);
-		
-		if (wrapper != null) {
-			this.inputFormat = wrapper.getUserCodeObject(InputFormat.class, cl);
-			this.inputFormat.configure(cfg.getStubParameters());
-		}
-	}
-
-	/**
-	 * Gets the input split type class
-	 *
-	 * @return Input split type class
-	 */
-	@Override
-	public Class<? extends InputSplit> getInputSplitType() {
-		if (inputFormat == null){
-			return InputSplit.class;
-		}
-
-		return inputFormat.getInputSplitType();
-	}
-
-	/**
-	 * Gets the input splits from the input format.
-	 *
-	 * @param minNumSplits Number of minimal input splits
-	 * @return Array of input splits
-	 * @throws IOException
-	 */
-	@Override
-	public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
-		if (inputFormat == null){
-			return null;
-		}
-
-		return inputFormat.createInputSplits(minNumSplits);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java
deleted file mode 100644
index 352d9b3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobgraph;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-
-/**
- * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobOutputVertex must not have any further output.
- */
-public class JobOutputVertex extends AbstractJobOutputVertex {
-	/**
-	 * Contains the output format associated to this output vertex. It can be <pre>null</pre>.
-	 */
-	private OutputFormat<?> outputFormat;
-
-
-	/**
-	 * Creates a new job file output vertex with the specified name.
-	 * 
-	 * @param name
-	 *        the name of the new job file output vertex
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobOutputVertex(String name, JobGraph jobGraph) {
-		this(name, null, jobGraph);
-	}
-	
-	public JobOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
-		super(name, id, jobGraph);
-	}
-
-	/**
-	 * Creates a new job file input vertex.
-	 * 
-	 * @param jobGraph
-	 *        the job graph this vertex belongs to
-	 */
-	public JobOutputVertex(JobGraph jobGraph) {
-		this(null, jobGraph);
-	}
-	
-	public void setOutputFormat(OutputFormat<?> format) {
-		this.outputFormat = format;
-	}
-	
-	public void initializeOutputFormatFromTaskConfig(ClassLoader cl) {
-		TaskConfig cfg = new TaskConfig(getConfiguration());
-		UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(cl);
-		
-		if (wrapper != null) {
-			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl);
-			this.outputFormat.configure(cfg.getStubParameters());
-		}
-	}
-
-	/**
-	 * Returns the output format. It can also be <pre>null</pre>.
-	 *
-	 * @return output format or <pre>null</pre>
-	 */
-	public OutputFormat<?> getOutputFormat() { return outputFormat; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
new file mode 100644
index 0000000..08a03bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+
+/**
+ * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
+ * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
+ * a JobOutputVertex must not have any further output.
+ */
+public class OutputFormatOutputVertex extends AbstractJobOutputVertex {
+	/**
+	 * Contains the output format associated to this output vertex. It can be <pre>null</pre>.
+	 */
+	private OutputFormat<?> outputFormat;
+
+
+	/**
+	 * Creates a new job file output vertex with the specified name.
+	 * 
+	 * @param name
+	 *        the name of the new job file output vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public OutputFormatOutputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	public OutputFormatOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
+	}
+
+	/**
+	 * Creates a new job file input vertex.
+	 * 
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public OutputFormatOutputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
+	}
+	
+	public void setOutputFormat(OutputFormat<?> format) {
+		this.outputFormat = format;
+	}
+	
+	public void initializeOutputFormatFromTaskConfig(ClassLoader cl) {
+		TaskConfig cfg = new TaskConfig(getConfiguration());
+		UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(cl);
+		
+		if (wrapper != null) {
+			this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl);
+			this.outputFormat.configure(cfg.getStubParameters());
+		}
+	}
+
+	/**
+	 * Returns the output format. It can also be <pre>null</pre>.
+	 *
+	 * @return output format or <pre>null</pre>
+	 */
+	public OutputFormat<?> getOutputFormat() { return outputFormat; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
new file mode 100644
index 0000000..3699f0e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.core.io.InputSplit;
+
+
+public class SimpleInputVertex extends AbstractJobInputVertex {
+
+	/**
+	 * Creates a new job file output vertex with the specified name.
+	 * 
+	 * @param name
+	 *        the name of the new job file output vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public SimpleInputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	public SimpleInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
+	}
+
+	/**
+	 * Creates a new job file input vertex.
+	 * 
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public SimpleInputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
+	}
+
+	@Override
+	public Class<? extends InputSplit> getInputSplitType() {
+		return null;
+	}
+
+	@Override
+	public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
+		return null;
+	}	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
new file mode 100644
index 0000000..8709a07
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+/**
+ * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
+ * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
+ * a JobOutputVertex must not have any further output.
+ */
+public class SimpleOutputVertex extends AbstractJobOutputVertex {
+
+	/**
+	 * Creates a new job file output vertex with the specified name.
+	 * 
+	 * @param name
+	 *        the name of the new job file output vertex
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public SimpleOutputVertex(String name, JobGraph jobGraph) {
+		this(name, null, jobGraph);
+	}
+	
+	public SimpleOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+		super(name, id, jobGraph);
+	}
+
+	/**
+	 * Creates a new job file input vertex.
+	 * 
+	 * @param jobGraph
+	 *        the job graph this vertex belongs to
+	 */
+	public SimpleOutputVertex(JobGraph jobGraph) {
+		this(null, jobGraph);
+	}	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 25bb027..d3ad516 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.configuration.Configuration;
@@ -24,7 +23,6 @@ import org.apache.flink.runtime.execution.Environment;
 
 /**
  * Abstract base class for every task class in Nephele.
- * 
  */
 public abstract class AbstractInvokable {
 
@@ -42,7 +40,7 @@ public abstract class AbstractInvokable {
 	 * Must be overwritten by the concrete task. This method is called by the task manager
 	 * when the actual execution of the task starts.
 	 * 
-	 * @throws Execution
+	 * @throws Exception
 	 *         thrown if any exception occurs during the execution of the tasks
 	 */
 	public abstract void invoke() throws Exception;
@@ -89,9 +87,9 @@ public abstract class AbstractInvokable {
 	}
 
 	/**
-	 * Returns the task configuration object which was attached to the original {@link JobVertex}.
+	 * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}.
 	 * 
-	 * @return the task configuration object which was attached to the original {@link JobVertex}
+	 * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}
 	 */
 	public final Configuration getTaskConfiguration() {
 
@@ -99,9 +97,9 @@ public abstract class AbstractInvokable {
 	}
 
 	/**
-	 * Returns the job configuration object which was attached to the original {@link JobGraph}.
+	 * Returns the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}.
 	 * 
-	 * @return the job configuration object which was attached to the original {@link JobGraph}
+	 * @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}
 	 */
 	public final Configuration getJobConfiguration() {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
index c181b58..7fafeda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.executiongraph;
 
-
 import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.runtime.io.network.api.RecordReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index 68f6496..a409222 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -35,11 +35,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.RegularPactTask;
@@ -227,9 +227,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	// -------------------------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
-	private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		CsvInputFormat pointsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
-		JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
+		InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -241,9 +241,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static JobInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		CsvInputFormat modelsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class);
-		JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
+		InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -278,8 +278,8 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		return pointsInput;
 	}
 
-	private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+	private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -308,10 +308,10 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Distance Builder");
 
 		// -- vertices ---------------------------------------------------------------------------------------------
-		JobInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
-		JobInputVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
+		InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
+		InputFormatInputVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
 		JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
-		JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
 
 		// -- edges ------------------------------------------------------------------------------------------------
 		JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 77d68f8..4d46b16 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -32,11 +32,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -95,10 +96,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	// Job vertex builder methods
 	// -------------------------------------------------------------------------------------------------------------
 
-	private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
-		JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
+		InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks);
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
 			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -116,10 +117,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return pointsInput;
 	}
 
-	private static JobInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static InputFormatInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class);
-		JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
+		InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
@@ -138,9 +139,9 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return modelsInput;
 	}
 
-	private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
+	private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
 		
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
@@ -254,8 +255,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return tail;
 	}
 	
-	private static JobOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
-		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+	private static SimpleOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
+		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.setIterationId(ITERATION_ID);
@@ -276,19 +277,19 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("KMeans Iterative");
 
 		// -- vertices ---------------------------------------------------------------------------------------------
-		JobInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
-		JobInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
+		InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
+		InputFormatInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
 		
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
 		JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
 		
 		JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
 		
-		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
+		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		
-		JobOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks);
+		SimpleOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks);
 		
-		JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
 
 		// -- edges ------------------------------------------------------------------------------------------------
 		JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 4fd22a3..8da4e5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -44,11 +44,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -173,12 +174,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 	// Invariant vertices across all variants
 	// -----------------------------------------------------------------------------------------------------------------
 
-	private static JobInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
+	private static InputFormatInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
 			TypeSerializerFactory<?> serializer,
 			TypeComparatorFactory<?> comparator) {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
-		JobInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
+		InputFormatInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
 			jobGraph, numSubTasks);
 		TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
 		{
@@ -204,13 +205,13 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		return verticesInput;
 	}
 
-	private static JobInputVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks,
+	private static InputFormatInputVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks,
 			TypeSerializerFactory<?> serializer,
 			TypeComparatorFactory<?> comparator) {
 		// edges
 		@SuppressWarnings("unchecked")
 		CsvInputFormat edgesInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class);
-		JobInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph,
+		InputFormatInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph,
 			numSubTasks);
 		TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration());
 		{
@@ -326,9 +327,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		return intermediate;
 	}
 
-	private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
+	private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
 			TypeSerializerFactory<?> serializer) {
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		{
 
@@ -351,14 +352,14 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		return output;
 	}
 
-	private static JobOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
-		JobOutputVertex fakeTailOutput =
+	private static SimpleOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
+		SimpleOutputVertex fakeTailOutput =
 			JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		return fakeTailOutput;
 	}
 
-	private static JobOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
-		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
+	private static SimpleOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
+		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(maxIterations);
 		syncConfig.setIterationId(ITERATION_ID);
@@ -388,16 +389,16 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
 
 		// -- invariant vertices -----------------------------------------------------------------------------------
-		JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
 
 		JobTaskVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
-		JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
-		JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// --------------- the tail (solution set join) ---------------
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
@@ -472,8 +473,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
 
 		// input
-		JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 
 		// head
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -485,10 +486,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		// output and auxiliaries
-		JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobOutputVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
-		JobOutputVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
-		JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		SimpleOutputVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
+		SimpleOutputVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
+		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ss join) ----------------------
 		JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
@@ -623,8 +624,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
 
 		// input
-		JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 
 		// head
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -636,9 +637,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		// output and auxiliaries
-		JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
-		JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ws update) ----------------------
 		JobTaskVertex wsUpdateIntermediate =
@@ -749,8 +750,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
 
 		// input
-		JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
+		InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
 
 		// head
 		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
@@ -760,9 +761,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		// output and auxiliaries
-		JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
-		JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
+		OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
+		SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
+		SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ss update) ----------------------
 		JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 5a6e4f5..8246d22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -33,11 +33,12 @@ import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -130,7 +131,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		// --------------------------------------------------------------------------------------------------------------
 
 		// - input -----------------------------------------------------------------------------------------------------
-		JobInputVertex input = JobGraphUtils.createInput(
+		InputFormatInputVertex input = JobGraphUtils.createInput(
 			new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks);
 		TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
 		{
@@ -213,7 +214,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		}
 
 		// - output ----------------------------------------------------------------------------------------------------
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		{
 			outputConfig.addInputToGroup(0);
@@ -224,10 +225,10 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		}
 
 		// - fake tail -------------------------------------------------------------------------------------------------
-		JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
+		SimpleOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
 
 		// - sync ------------------------------------------------------------------------------------------------------
-		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
+		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(maxIterations);
 		syncConfig.setIterationId(ITERATION_ID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 4370111..052c7ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -33,11 +33,12 @@ import org.apache.flink.runtime.iterative.io.FakeOutputTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.DataSinkTask;
 import org.apache.flink.runtime.operators.DataSourceTask;
 import org.apache.flink.runtime.operators.RegularPactTask;
@@ -55,17 +56,17 @@ public class JobGraphUtils {
 		client.submitJobAndWait();
 	}
 	
-	public static <T extends FileInputFormat<?>> JobInputVertex createInput(T stub, String path, String name, JobGraph graph,
+	public static <T extends FileInputFormat<?>> InputFormatInputVertex createInput(T stub, String path, String name, JobGraph graph,
 			int degreeOfParallelism)
 	{
 		stub.setFilePath(path);
 		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism);
 	}
 
-	private static <T extends InputFormat<?,?>> JobInputVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
+	private static <T extends InputFormat<?,?>> InputFormatInputVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
 			int degreeOfParallelism)
 	{
-		JobInputVertex inputVertex = new JobInputVertex(name, graph);
+		InputFormatInputVertex inputVertex = new InputFormatInputVertex(name, graph);
 		
 		inputVertex.setInvokableClass(DataSourceTask.class);
 		
@@ -99,8 +100,8 @@ public class JobGraphUtils {
 		return taskVertex;
 	}
 
-	public static JobOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
-		JobOutputVertex sync = new JobOutputVertex("BulkIterationSync", jobGraph);
+	public static SimpleOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
+		SimpleOutputVertex sync = new SimpleOutputVertex("BulkIterationSync", jobGraph);
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setNumberOfSubtasks(1);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -108,17 +109,17 @@ public class JobGraphUtils {
 		return sync;
 	}
 
-	public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
+	public static SimpleOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
-		JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph);
+		SimpleOutputVertex outputVertex = new SimpleOutputVertex(name, jobGraph);
 		outputVertex.setInvokableClass(FakeOutputTask.class);
 		outputVertex.setNumberOfSubtasks(degreeOfParallelism);
 		return outputVertex;
 	}
 
-	public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
+	public static OutputFormatOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
-		JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
+		OutputFormatOutputVertex sinkVertex = new OutputFormatOutputVertex(name, jobGraph);
 		sinkVertex.setInvokableClass(DataSinkTask.class);
 		sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
 		return sinkVertex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 85cedba..aea2c2c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -31,10 +30,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRank {
 		// --------------- the inputs ---------------------
 
 		// page rank input
-		JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
+		InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
 			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRank {
 		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
 
 		// edges as adjacency list
-		JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
+		InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
 			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -267,7 +267,7 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -276,10 +276,10 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the auxiliaries ---------------------
 		
-		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
+		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
 			degreeOfParallelism);
 
-		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index c60f905..a740cf3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -31,10 +30,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// --------------- the inputs ---------------------
 
 		// page rank input
-		JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
+		InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
 			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
 
 		// edges as adjacency list
-		JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
+		InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
 			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
@@ -279,7 +279,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the output ---------------------
 
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+		OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -288,10 +288,9 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the auxiliaries ---------------------
 		
-		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
-			degreeOfParallelism);
+		SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
 
-		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());


Mime
View raw message