flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/3] flink git commit: [FLINK-2120][runtime] rename AbstractJobVertex to JobVertex
Date Tue, 16 Jun 2015 15:20:09 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 894a7a9..a4bd03c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -79,11 +79,11 @@ public class ExecutionGraphConstructionTest {
 		final String jobName = "Test Job Sample Name";
 		final Configuration cfg = new Configuration();
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
-		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
-		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
-		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
 		
 		v1.setParallelism(5);
 		v2.setParallelism(7);
@@ -97,7 +97,7 @@ public class ExecutionGraphConstructionTest {
 		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
 		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
 		
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -118,9 +118,9 @@ public class ExecutionGraphConstructionTest {
 		final Configuration cfg = new Configuration();
 		
 		// construct part one of the execution graph
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
-		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
 		
 		v1.setParallelism(5);
 		v2.setParallelism(7);
@@ -135,7 +135,7 @@ public class ExecutionGraphConstructionTest {
 		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 		
 		
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -148,8 +148,8 @@ public class ExecutionGraphConstructionTest {
 		
 		// attach the second part of the graph
 		
-		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
-		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
 		v4.setParallelism(11);
 		v5.setParallelism(4);
 		
@@ -158,7 +158,7 @@ public class ExecutionGraphConstructionTest {
 		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
 		v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL);
 		
-		List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v4, v5));
+		List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
 		
 		try {
 			eg.attachJobGraph(ordered2);
@@ -179,9 +179,9 @@ public class ExecutionGraphConstructionTest {
 		final Configuration cfg = new Configuration();
 		
 		// construct part one of the execution graph
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
-		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
 		
 		v1.setParallelism(5);
 		v2.setParallelism(7);
@@ -196,7 +196,7 @@ public class ExecutionGraphConstructionTest {
 		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 		
 		
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -209,8 +209,8 @@ public class ExecutionGraphConstructionTest {
 		
 		// attach the second part of the graph
 		
-		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
-		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
 		v4.setParallelism(11);
 		v5.setParallelism(4);
 		
@@ -219,7 +219,7 @@ public class ExecutionGraphConstructionTest {
 		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
 		v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL);
 		
-		List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v4, v5));
+		List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
 		
 		try {
 			eg.attachJobGraph(ordered2);
@@ -234,8 +234,8 @@ public class ExecutionGraphConstructionTest {
 	}
 	
 	private void verifyTestGraph(ExecutionGraph eg, JobID jobId,
-				AbstractJobVertex v1, AbstractJobVertex v2, AbstractJobVertex v3,
-				AbstractJobVertex v4, AbstractJobVertex v5)
+				JobVertex v1, JobVertex v2, JobVertex v3,
+				JobVertex v4, JobVertex v5)
 	{
 		Map<JobVertexID, ExecutionJobVertex> vertices = eg.getAllVertices();
 		
@@ -441,10 +441,10 @@ public class ExecutionGraphConstructionTest {
 		final Configuration cfg = new Configuration();
 		
 		// construct part one of the execution graph
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		JobVertex v1 = new JobVertex("vertex1");
 		v1.setParallelism(7);
 		
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -456,10 +456,10 @@ public class ExecutionGraphConstructionTest {
 		}
 		
 		// attach the second part of the graph
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v2 = new JobVertex("vertex2");
 		v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL);
 		
-		List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v2));
+		List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v2));
 		
 		try {
 			eg.attachJobGraph(ordered2);
@@ -476,11 +476,11 @@ public class ExecutionGraphConstructionTest {
 		final String jobName = "Test Job Sample Name";
 		final Configuration cfg = new Configuration();
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
-		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
-		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
-		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
 		
 		v1.setParallelism(5);
 		v2.setParallelism(7);
@@ -494,7 +494,7 @@ public class ExecutionGraphConstructionTest {
 		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
 		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
 		
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -528,11 +528,11 @@ public class ExecutionGraphConstructionTest {
 			final String jobName = "Test Job Sample Name";
 			final Configuration cfg = new Configuration();
 			
-			AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-			AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
-			AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
-			AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
-			AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+			JobVertex v1 = new JobVertex("vertex1");
+			JobVertex v2 = new JobVertex("vertex2");
+			JobVertex v3 = new JobVertex("vertex3");
+			JobVertex v4 = new JobVertex("vertex4");
+			JobVertex v5 = new JobVertex("vertex5");
 			
 			v1.setParallelism(5);
 			v2.setParallelism(7);
@@ -549,7 +549,7 @@ public class ExecutionGraphConstructionTest {
 			v3.setInputSplitSource(source1);
 			v5.setInputSplitSource(source2);
 			
-			List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
 					AkkaUtils.getDefaultTimeout());
@@ -577,9 +577,9 @@ public class ExecutionGraphConstructionTest {
 			final String jobName = "Test Job Sample Name";
 			final Configuration cfg = new Configuration();
 			
-			AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-			AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
-			AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+			JobVertex v1 = new JobVertex("vertex1");
+			JobVertex v2 = new JobVertex("vertex2");
+			JobVertex v3 = new JobVertex("vertex3");
 			
 			v1.setParallelism(5);
 			v2.setParallelism(7);
@@ -589,7 +589,7 @@ public class ExecutionGraphConstructionTest {
 			v2.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
 			v3.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
 			
-			List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
 					AkkaUtils.getDefaultTimeout());
@@ -616,8 +616,8 @@ public class ExecutionGraphConstructionTest {
 			final Configuration cfg = new Configuration();
 			
 			// simple group of two, cyclic
-			AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-			AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+			JobVertex v1 = new JobVertex("vertex1");
+			JobVertex v2 = new JobVertex("vertex2");
 			v1.setParallelism(6);
 			v2.setParallelism(4);
 			
@@ -628,11 +628,11 @@ public class ExecutionGraphConstructionTest {
 			v1.setStrictlyCoLocatedWith(v2);
 			
 			// complex forked dependency pattern
-			AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
-			AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
-			AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
-			AbstractJobVertex v6 = new AbstractJobVertex("vertex6");
-			AbstractJobVertex v7 = new AbstractJobVertex("vertex7");
+			JobVertex v3 = new JobVertex("vertex3");
+			JobVertex v4 = new JobVertex("vertex4");
+			JobVertex v5 = new JobVertex("vertex5");
+			JobVertex v6 = new JobVertex("vertex6");
+			JobVertex v7 = new JobVertex("vertex7");
 			v3.setParallelism(3);
 			v4.setParallelism(3);
 			v5.setParallelism(3);
@@ -652,7 +652,7 @@ public class ExecutionGraphConstructionTest {
 			v3.setStrictlyCoLocatedWith(v7);
 			
 			// isolated vertex
-			AbstractJobVertex v8 = new AbstractJobVertex("vertex8");
+			JobVertex v8 = new JobVertex("vertex8");
 			v8.setParallelism(2);
 			
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index a53c318..03a41b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,10 +81,10 @@ public class ExecutionGraphDeploymentTest {
 			final JobVertexID jid3 = new JobVertexID();
 			final JobVertexID jid4 = new JobVertexID();
 
-			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
-			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
-			AbstractJobVertex v3 = new AbstractJobVertex("v3", jid3);
-			AbstractJobVertex v4 = new AbstractJobVertex("v4", jid4);
+			JobVertex v1 = new JobVertex("v1", jid1);
+			JobVertex v2 = new JobVertex("v2", jid2);
+			JobVertex v3 = new JobVertex("v3", jid3);
+			JobVertex v4 = new JobVertex("v4", jid4);
 
 			v1.setParallelism(10);
 			v2.setParallelism(10);
@@ -103,7 +103,7 @@ public class ExecutionGraphDeploymentTest {
 			ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(),
 					AkkaUtils.getDefaultTimeout());
 
-			List<AbstractJobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
 			eg.attachJobGraph(ordered);
 
@@ -163,8 +163,8 @@ public class ExecutionGraphDeploymentTest {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
 
-			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
-			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			JobVertex v1 = new JobVertex("v1", jid1);
+			JobVertex v2 = new JobVertex("v2", jid2);
 
 			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350);
 
@@ -187,8 +187,8 @@ public class ExecutionGraphDeploymentTest {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
 
-			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
-			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			JobVertex v1 = new JobVertex("v1", jid1);
+			JobVertex v2 = new JobVertex("v2", jid2);
 
 			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
 
@@ -211,8 +211,8 @@ public class ExecutionGraphDeploymentTest {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
 
-			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
-			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			JobVertex v1 = new JobVertex("v1", jid1);
+			JobVertex v2 = new JobVertex("v2", jid2);
 
 			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
 
@@ -235,8 +235,8 @@ public class ExecutionGraphDeploymentTest {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
 
-			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
-			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			JobVertex v1 = new JobVertex("v1", jid1);
+			JobVertex v2 = new JobVertex("v2", jid2);
 
 			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37);
 
@@ -260,8 +260,8 @@ public class ExecutionGraphDeploymentTest {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
 
-			AbstractJobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
-			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			JobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
+			JobVertex v2 = new JobVertex("v2", jid2);
 
 			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4);
 
@@ -297,7 +297,7 @@ public class ExecutionGraphDeploymentTest {
 		}
 	}
 
-	private Map<ExecutionAttemptID, Execution> setupExecution(AbstractJobVertex v1, int dop1, AbstractJobVertex v2, int dop2) throws Exception {
+	private Map<ExecutionAttemptID, Execution> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
 		final JobID jobId = new JobID();
 
 		v1.setParallelism(dop1);
@@ -311,7 +311,7 @@ public class ExecutionGraphDeploymentTest {
 				AkkaUtils.getDefaultTimeout());
 		eg.setQueuedSchedulingAllowed(false);
 
-		List<AbstractJobVertex> ordered = Arrays.asList(v1, v2);
+		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
 		// create a mock taskmanager that accepts deployment calls
@@ -333,7 +333,7 @@ public class ExecutionGraphDeploymentTest {
 	}
 
 	@SuppressWarnings("serial")
-	public static class FailingFinalizeJobVertex extends AbstractJobVertex {
+	public static class FailingFinalizeJobVertex extends JobVertex {
 
 		public FailingFinalizeJobVertex(String name) {
 			super(name);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a1ee79b..a77a09e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -147,7 +147,7 @@ public class ExecutionGraphTestUtils {
 	}
 	
 	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
-		AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", id);
+		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 		
 		ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 733ad11..7787ab4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -31,7 +31,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -61,7 +61,7 @@ public class ExecutionStateProgressTest {
 			final JobID jid = new JobID();
 			final JobVertexID vid = new JobVertexID();
 
-			AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", vid);
+			JobVertex ajv = new JobVertex("TestVertex", vid);
 			ajv.setParallelism(3);
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index f0001a9..0d2ffeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
@@ -283,7 +283,7 @@ public class LocalInputSplitsTest {
 		};
 		
 		try {
-			AbstractJobVertex vertex = new AbstractJobVertex("test vertex");
+			JobVertex vertex = new JobVertex("test vertex");
 			vertex.setParallelism(6);
 			vertex.setInvokableClass(DummyInvokable.class);
 			vertex.setInputSplitSource(new TestInputSplitSource(splits));
@@ -342,7 +342,7 @@ public class LocalInputSplitsTest {
 			TestLocatableInputSplit[] splits)
 		throws Exception
 	{
-		AbstractJobVertex vertex = new AbstractJobVertex("test vertex");
+		JobVertex vertex = new JobVertex("test vertex");
 		vertex.setParallelism(parallelism);
 		vertex.setInvokableClass(DummyInvokable.class);
 		vertex.setInputSplitSource(new TestInputSplitSource(splits));

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index f72d105..4677bf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -31,7 +31,7 @@ import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 
@@ -46,15 +46,15 @@ public class PointwisePatternTest {
 	public void testNToN() {
 		final int N = 23;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(N);
 		v2.setParallelism(N);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -81,15 +81,15 @@ public class PointwisePatternTest {
 	public void test2NToN() {
 		final int N = 17;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(2 * N);
 		v2.setParallelism(N);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -117,15 +117,15 @@ public class PointwisePatternTest {
 	public void test3NToN() {
 		final int N = 17;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(3 * N);
 		v2.setParallelism(N);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -154,15 +154,15 @@ public class PointwisePatternTest {
 	public void testNTo2N() {
 		final int N = 41;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(N);
 		v2.setParallelism(2 * N);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -189,15 +189,15 @@ public class PointwisePatternTest {
 	public void testNTo7N() {
 		final int N = 11;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(N);
 		v2.setParallelism(7 * N);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -244,15 +244,15 @@ public class PointwisePatternTest {
 		final int factor = highDop / lowDop;
 		final int delta = highDop % lowDop == 0 ? 0 : 1;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(lowDop);
 		v2.setParallelism(highDop);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {
@@ -290,15 +290,15 @@ public class PointwisePatternTest {
 		final int factor = highDop / lowDop;
 		final int delta = highDop % lowDop == 0 ? 0 : 1;
 		
-		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
-		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
 	
 		v1.setParallelism(highDop);
 		v2.setParallelism(lowDop);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
-		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 8cba6ca..376ff14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -105,10 +105,10 @@ public class TerminalStateDeadlockTest {
 			
 			final Configuration jobConfig = new Configuration();
 			
-			final List<AbstractJobVertex> vertices;
+			final List<JobVertex> vertices;
 			{
-				AbstractJobVertex v1 = new AbstractJobVertex("v1", vid1);
-				AbstractJobVertex v2 = new AbstractJobVertex("v2", vid2);
+				JobVertex v1 = new JobVertex("v1", vid1);
+				JobVertex v2 = new JobVertex("v2", vid2);
 				v1.setParallelism(1);
 				v2.setParallelism(1);
 				v1.setInvokableClass(DummyInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index cae10f4..756b9a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -97,7 +97,7 @@ public class VertexLocationConstraintTest {
 			scheduler.newInstanceAvailable(instance3);
 			
 			// prepare the execution graph
-			AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID());
 			jobVertex.setInvokableClass(DummyInvokable.class);
 			jobVertex.setParallelism(2);
 			JobGraph jg = new JobGraph("test job", jobVertex);
@@ -163,7 +163,7 @@ public class VertexLocationConstraintTest {
 			scheduler.newInstanceAvailable(instance3);
 			
 			// prepare the execution graph
-			AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID());
 			jobVertex.setInvokableClass(DummyInvokable.class);
 			jobVertex.setParallelism(2);
 			JobGraph jg = new JobGraph("test job", jobVertex);
@@ -225,8 +225,8 @@ public class VertexLocationConstraintTest {
 			scheduler.newInstanceAvailable(instance3);
 			
 			// prepare the execution graph
-			AbstractJobVertex jobVertex1 = new AbstractJobVertex("v1", new JobVertexID());
-			AbstractJobVertex jobVertex2 = new AbstractJobVertex("v2", new JobVertexID());
+			JobVertex jobVertex1 = new JobVertex("v1", new JobVertexID());
+			JobVertex jobVertex2 = new JobVertex("v2", new JobVertexID());
 			jobVertex1.setInvokableClass(DummyInvokable.class);
 			jobVertex2.setInvokableClass(DummyInvokable.class);
 			jobVertex1.setParallelism(2);
@@ -294,7 +294,7 @@ public class VertexLocationConstraintTest {
 			scheduler.newInstanceAvailable(instance1);
 			
 			// prepare the execution graph
-			AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID());
 			jobVertex.setInvokableClass(DummyInvokable.class);
 			jobVertex.setParallelism(1);
 			JobGraph jg = new JobGraph("test job", jobVertex);
@@ -347,8 +347,8 @@ public class VertexLocationConstraintTest {
 			scheduler.newInstanceAvailable(instance1);
 			
 			// prepare the execution graph
-			AbstractJobVertex jobVertex1 = new AbstractJobVertex("v1", new JobVertexID());
-			AbstractJobVertex jobVertex2 = new AbstractJobVertex("v2", new JobVertexID());
+			JobVertex jobVertex1 = new JobVertex("v1", new JobVertexID());
+			JobVertex jobVertex2 = new JobVertex("v2", new JobVertexID());
 			
 			jobVertex1.setInvokableClass(DummyInvokable.class);
 			jobVertex2.setInvokableClass(DummyInvokable.class);
@@ -392,7 +392,7 @@ public class VertexLocationConstraintTest {
 	@Test
 	public void testArchivingClearsFields() {
 		try {
-			AbstractJobVertex vertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			JobVertex vertex = new JobVertex("test vertex", new JobVertexID());
 			JobGraph jg = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 17d78d2..a1d6d03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -43,11 +43,11 @@ public class VertexSlotSharingTest {
 	@Test
 	public void testAssignSlotSharingGroup() {
 		try {
-			AbstractJobVertex v1 = new AbstractJobVertex("v1");
-			AbstractJobVertex v2 = new AbstractJobVertex("v2");
-			AbstractJobVertex v3 = new AbstractJobVertex("v3");
-			AbstractJobVertex v4 = new AbstractJobVertex("v4");
-			AbstractJobVertex v5 = new AbstractJobVertex("v5");
+			JobVertex v1 = new JobVertex("v1");
+			JobVertex v2 = new JobVertex("v2");
+			JobVertex v3 = new JobVertex("v3");
+			JobVertex v4 = new JobVertex("v4");
+			JobVertex v5 = new JobVertex("v5");
 			
 			v1.setParallelism(4);
 			v2.setParallelism(5);
@@ -66,7 +66,7 @@ public class VertexSlotSharingTest {
 			v4.setSlotSharingGroup(jg2);
 			v5.setSlotSharingGroup(jg2);
 			
-			List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+			List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 			
 			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration(),
 					AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index e49f19f..8709395 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -409,8 +409,8 @@ public class SharedSlotsTest {
 			JobVertexID tailId = new JobVertexID();
 			JobVertexID sinkId = new JobVertexID();
 
-			AbstractJobVertex headVertex = new AbstractJobVertex("head", headId);
-			AbstractJobVertex tailVertex = new AbstractJobVertex("tail", tailId);
+			JobVertex headVertex = new JobVertex("head", headId);
+			JobVertex tailVertex = new JobVertex("tail", tailId);
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(sourceId, headId, tailId, sinkId);
 			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
@@ -545,8 +545,8 @@ public class SharedSlotsTest {
 			JobVertexID tailId = new JobVertexID();
 			JobVertexID sinkId = new JobVertexID();
 
-			AbstractJobVertex headVertex = new AbstractJobVertex("head", headId);
-			AbstractJobVertex tailVertex = new AbstractJobVertex("tail", tailId);
+			JobVertex headVertex = new JobVertex("head", headId);
+			JobVertex tailVertex = new JobVertex("tail", tailId);
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(sourceId, headId, tailId, sinkId);
 			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
@@ -633,7 +633,7 @@ public class SharedSlotsTest {
 		try {
 			JobID jobId = new JobID();
 			JobVertexID vid = new JobVertexID();
-			AbstractJobVertex vertex = new AbstractJobVertex("vertex", vid);
+			JobVertex vertex = new JobVertex("vertex", vid);
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
 			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 821826a..7e15af4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -78,11 +78,11 @@ public class PartialConsumePipelinedResultTest {
 	 */
 	@Test
 	public void testPartialConsumePipelinedResultReceiver() throws Exception {
-		final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+		final JobVertex sender = new JobVertex("Sender");
 		sender.setInvokableClass(SlowBufferSender.class);
 		sender.setParallelism(PARALLELISM);
 
-		final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+		final JobVertex receiver = new JobVertex("Receiver");
 		receiver.setInvokableClass(SingleBufferReceiver.class);
 		receiver.setParallelism(PARALLELISM);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index aac6948..9f88bd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -41,9 +41,9 @@ public class JobGraphTest {
 			
 			// add some vertices
 			{
-				AbstractJobVertex source1 = new AbstractJobVertex("source1");
-				AbstractJobVertex source2 = new AbstractJobVertex("source2");
-				AbstractJobVertex target = new AbstractJobVertex("target");
+				JobVertex source1 = new JobVertex("source1");
+				JobVertex source2 = new JobVertex("source2");
+				JobVertex target = new JobVertex("target");
 				target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
 				target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL);
 				
@@ -60,8 +60,8 @@ public class JobGraphTest {
 			assertEquals(jg.getJobConfiguration(), copy.getJobConfiguration());
 			assertEquals(jg.getNumberOfVertices(), copy.getNumberOfVertices());
 			
-			for (AbstractJobVertex vertex : copy.getVertices()) {
-				AbstractJobVertex original = jg.findVertexByID(vertex.getID());
+			for (JobVertex vertex : copy.getVertices()) {
+				JobVertex original = jg.findVertexByID(vertex.getID());
 				assertNotNull(original);
 				assertEquals(original.getName(), vertex.getName());
 				assertEquals(original.getNumberOfInputs(), vertex.getNumberOfInputs());
@@ -77,12 +77,12 @@ public class JobGraphTest {
 	@Test
 	public void testTopologicalSort1() {
 		try {
-			AbstractJobVertex source1 = new AbstractJobVertex("source1");
-			AbstractJobVertex source2 = new AbstractJobVertex("source2");
-			AbstractJobVertex target1 = new AbstractJobVertex("target1");
-			AbstractJobVertex target2 = new AbstractJobVertex("target2");
-			AbstractJobVertex intermediate1 = new AbstractJobVertex("intermediate1");
-			AbstractJobVertex intermediate2 = new AbstractJobVertex("intermediate2");
+			JobVertex source1 = new JobVertex("source1");
+			JobVertex source2 = new JobVertex("source2");
+			JobVertex target1 = new JobVertex("target1");
+			JobVertex target2 = new JobVertex("target2");
+			JobVertex intermediate1 = new JobVertex("intermediate1");
+			JobVertex intermediate2 = new JobVertex("intermediate2");
 			
 			target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
 			target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
@@ -91,7 +91,7 @@ public class JobGraphTest {
 			intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
 			
 			JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2, target1, target2);
-			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
 			assertEquals(6, sorted.size());
 			
@@ -112,13 +112,13 @@ public class JobGraphTest {
 	@Test
 	public void testTopologicalSort2() {
 		try {
-			AbstractJobVertex source1 = new AbstractJobVertex("source1");
-			AbstractJobVertex source2 = new AbstractJobVertex("source2");
-			AbstractJobVertex root = new AbstractJobVertex("root");
-			AbstractJobVertex l11 = new AbstractJobVertex("layer 1 - 1");
-			AbstractJobVertex l12 = new AbstractJobVertex("layer 1 - 2");
-			AbstractJobVertex l13 = new AbstractJobVertex("layer 1 - 3");
-			AbstractJobVertex l2 = new AbstractJobVertex("layer 2");
+			JobVertex source1 = new JobVertex("source1");
+			JobVertex source2 = new JobVertex("source2");
+			JobVertex root = new JobVertex("root");
+			JobVertex l11 = new JobVertex("layer 1 - 1");
+			JobVertex l12 = new JobVertex("layer 1 - 2");
+			JobVertex l13 = new JobVertex("layer 1 - 3");
+			JobVertex l2 = new JobVertex("layer 2");
 			
 			root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE);
 			root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
@@ -135,7 +135,7 @@ public class JobGraphTest {
 			l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
 			
 			JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2);
-			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
 			assertEquals(7,  sorted.size());
 			
@@ -170,10 +170,10 @@ public class JobGraphTest {
 		//             ---------
 		
 		try {
-			AbstractJobVertex source = new AbstractJobVertex("source");
-			AbstractJobVertex op1 = new AbstractJobVertex("op4");
-			AbstractJobVertex op2 = new AbstractJobVertex("op2");
-			AbstractJobVertex op3 = new AbstractJobVertex("op3");
+			JobVertex source = new JobVertex("source");
+			JobVertex op1 = new JobVertex("op4");
+			JobVertex op2 = new JobVertex("op2");
+			JobVertex op3 = new JobVertex("op3");
 			
 			op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
 			op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE);
@@ -181,7 +181,7 @@ public class JobGraphTest {
 			op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE);
 			
 			JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
-			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
 			assertEquals(4,  sorted.size());
 			
@@ -199,10 +199,10 @@ public class JobGraphTest {
 	@Test
 	public void testTopoSortCyclicGraphNoSources() {
 		try {
-			AbstractJobVertex v1 = new AbstractJobVertex("1");
-			AbstractJobVertex v2 = new AbstractJobVertex("2");
-			AbstractJobVertex v3 = new AbstractJobVertex("3");
-			AbstractJobVertex v4 = new AbstractJobVertex("4");
+			JobVertex v1 = new JobVertex("1");
+			JobVertex v2 = new JobVertex("2");
+			JobVertex v3 = new JobVertex("3");
+			JobVertex v4 = new JobVertex("4");
 			
 			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
 			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
@@ -227,12 +227,12 @@ public class JobGraphTest {
 	@Test
 	public void testTopoSortCyclicGraphIntermediateCycle() {
 		try{ 
-			AbstractJobVertex source = new AbstractJobVertex("source");
-			AbstractJobVertex v1 = new AbstractJobVertex("1");
-			AbstractJobVertex v2 = new AbstractJobVertex("2");
-			AbstractJobVertex v3 = new AbstractJobVertex("3");
-			AbstractJobVertex v4 = new AbstractJobVertex("4");
-			AbstractJobVertex target = new AbstractJobVertex("target");
+			JobVertex source = new JobVertex("source");
+			JobVertex v1 = new JobVertex("1");
+			JobVertex v2 = new JobVertex("2");
+			JobVertex v3 = new JobVertex("3");
+			JobVertex v4 = new JobVertex("4");
+			JobVertex target = new JobVertex("target");
 			
 			v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
 			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
@@ -256,9 +256,9 @@ public class JobGraphTest {
 		}
 	}
 	
-	private static final void assertBefore(AbstractJobVertex v1, AbstractJobVertex v2, List<AbstractJobVertex> list) {
+	private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) {
 		boolean seenFirst = false;
-		for (AbstractJobVertex v : list) {
+		for (JobVertex v : list) {
 			if (v == v1) {
 				seenFirst = true;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index b1d05e1..2511bd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -40,8 +40,8 @@ public class JobTaskVertexTest {
 
 	@Test
 	public void testConnectDirectly() {
-		AbstractJobVertex source = new AbstractJobVertex("source");
-		AbstractJobVertex target = new AbstractJobVertex("target");
+		JobVertex source = new JobVertex("source");
+		JobVertex target = new JobVertex("target");
 		target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
 		
 		assertTrue(source.isInputVertex());
@@ -60,9 +60,9 @@ public class JobTaskVertexTest {
 	
 	@Test
 	public void testConnectMultipleTargets() {
-		AbstractJobVertex source = new AbstractJobVertex("source");
-		AbstractJobVertex target1= new AbstractJobVertex("target1");
-		AbstractJobVertex target2 = new AbstractJobVertex("target2");
+		JobVertex source = new JobVertex("source");
+		JobVertex target1= new JobVertex("target1");
+		JobVertex target2 = new JobVertex("target2");
 		target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
 		target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.ALL_TO_ALL);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 9dd078d..3eb4be8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -105,7 +105,7 @@ public class JobManagerTest {
 				final IntermediateDataSetID rid = new IntermediateDataSetID();
 
 				// Create a task
-				final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+				final JobVertex sender = new JobVertex("Sender");
 				sender.setParallelism(1);
 				sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
 				sender.createAndAddResultDataSet(rid, PIPELINED);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 7b6d688..2f11c08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
@@ -76,7 +76,7 @@ public class JobSubmitTest {
 	public void testFailureWhenJarBlobsMissing() {
 		try {
 			// create a simple job graph
-			AbstractJobVertex jobVertex = new AbstractJobVertex("Test Vertex");
+			JobVertex jobVertex = new JobVertex("Test Vertex");
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 
@@ -130,7 +130,7 @@ public class JobSubmitTest {
 		try {
 			// create a simple job graph
 
-			AbstractJobVertex jobVertex = new AbstractJobVertex("Vertex that fails in initializeOnMaster") {
+			JobVertex jobVertex = new JobVertex("Vertex that fails in initializeOnMaster") {
 
 				@Override
 				public void initializeOnMaster(ClassLoader loader) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index b404aae..8272bf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -86,12 +86,12 @@ public class SlotCountExceedingParallelismTest {
 			int receiverParallelism) {
 
 		// The sender and receiver invokable logic ensure that each subtask gets the expected data
-		final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+		final JobVertex sender = new JobVertex("Sender");
 		sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
 		sender.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism);
 		sender.setParallelism(senderParallelism);
 
-		final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+		final JobVertex receiver = new JobVertex("Receiver");
 		receiver.setInvokableClass(SubtaskIndexReceiver.class);
 		receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism);
 		receiver.setParallelism(receiverParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 94dee71..3bd4368 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.AbstractID;
 import org.junit.Test;
@@ -40,10 +40,10 @@ public class CoLocationConstraintTest {
 			JobVertexID id1 = new JobVertexID();
 			JobVertexID id2 = new JobVertexID();
 
-			AbstractJobVertex vertex1 = new AbstractJobVertex("vertex1", id1);
+			JobVertex vertex1 = new JobVertex("vertex1", id1);
 			vertex1.setParallelism(2);
 			
-			AbstractJobVertex vertex2 = new AbstractJobVertex("vertex2", id2);
+			JobVertex vertex2 = new JobVertex("vertex2", id2);
 			vertex2.setParallelism(3);
 			
 			CoLocationGroup group = new CoLocationGroup(vertex1, vertex2);
@@ -74,7 +74,7 @@ public class CoLocationConstraintTest {
 		try {
 			JobID jid = new JobID();
 					
-			AbstractJobVertex vertex = new AbstractJobVertex("vertex");
+			JobVertex vertex = new JobVertex("vertex");
 			vertex.setParallelism(1);
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(vertex.getID());

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 08f8bfb..3bcac56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -79,12 +79,12 @@ public class ScheduleOrUpdateConsumersTest {
 	 */
 	@Test
 	public void testMixedPipelinedAndBlockingResults() throws Exception {
-		final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+		final JobVertex sender = new JobVertex("Sender");
 		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
 		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
 		sender.setParallelism(PARALLELISM);
 
-		final AbstractJobVertex pipelinedReceiver = new AbstractJobVertex("Pipelined Receiver");
+		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
 		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
 		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
 		pipelinedReceiver.setParallelism(PARALLELISM);
@@ -94,7 +94,7 @@ public class ScheduleOrUpdateConsumersTest {
 				DistributionPattern.ALL_TO_ALL,
 				ResultPartitionType.PIPELINED);
 
-		final AbstractJobVertex blockingReceiver = new AbstractJobVertex("Blocking Receiver");
+		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
 		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
 		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
 		blockingReceiver.setParallelism(PARALLELISM);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 77193f1..67a5f44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -77,12 +77,12 @@ public class TaskCancelTest {
 			// Setup
 			final JobGraph jobGraph = new JobGraph("Cancel Big Union");
 
-			AbstractJobVertex[] sources = new AbstractJobVertex[numberOfSources];
+			JobVertex[] sources = new JobVertex[numberOfSources];
 			SlotSharingGroup group = new SlotSharingGroup();
 
 			// Create multiple sources
 			for (int i = 0; i < sources.length; i++) {
-				sources[i] = new AbstractJobVertex("Source " + i);
+				sources[i] = new JobVertex("Source " + i);
 				sources[i].setInvokableClass(InfiniteSource.class);
 				sources[i].setParallelism(sourceParallelism);
 				sources[i].setSlotSharingGroup(group);
@@ -92,14 +92,14 @@ public class TaskCancelTest {
 			}
 
 			// Union all sources
-			AbstractJobVertex union = new AbstractJobVertex("Union");
+			JobVertex union = new JobVertex("Union");
 			union.setInvokableClass(AgnosticUnion.class);
 			union.setParallelism(sourceParallelism);
 
 			jobGraph.addVertex(union);
 
 			// Each source creates a separate result
-			for (AbstractJobVertex source : sources) {
+			for (JobVertex source : sources) {
 				union.connectNewDataSetAsInput(
 						source,
 						DistributionPattern.POINTWISE,

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index a36ded9..a1ca43c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -23,7 +23,7 @@ import akka.testkit.TestKit
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
@@ -50,7 +50,7 @@ with Matchers with BeforeAndAfterAll {
         val scheduler = new Scheduler
         scheduler.newInstanceAvailable(instance)
 
-        val sender = new AbstractJobVertex("Task")
+        val sender = new JobVertex("Task")
         sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
         sender.setParallelism(NUM_TASKS)
 
@@ -94,7 +94,7 @@ with Matchers with BeforeAndAfterAll {
         val scheduler = new Scheduler
         scheduler.newInstanceAvailable(instance)
 
-        val sender = new AbstractJobVertex("Task")
+        val sender = new JobVertex("Task")
         sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
         sender.setParallelism(NUM_TASKS)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 1a36112..13199bc 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
 .SimpleAcknowledgingTaskManager
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
@@ -53,7 +53,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         scheduler.newInstanceAvailable(instance1)
         scheduler.newInstanceAvailable(instance2)
 
-        val sender = new AbstractJobVertex("Task")
+        val sender = new JobVertex("Task")
         sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
         sender.setParallelism(20)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 07f0ce5..19f1ea1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -21,8 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern,
-AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
@@ -45,8 +44,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with WrapA
     "support colocation constraints and slot sharing" in {
       val num_tasks = 31
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[Receiver])

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index ee584f0..750da83 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -25,7 +25,7 @@ import akka.pattern.ask
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode}
+import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph, ScheduleMode}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingUtils
@@ -54,7 +54,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
   "The JobManager actor" must {
 
     "handle jobs when not enough slots" in {
-      val vertex = new AbstractJobVertex("Test Vertex")
+      val vertex = new JobVertex("Test Vertex")
       vertex.setParallelism(2)
       vertex.setInvokableClass(classOf[BlockingNoOpInvokable])
 
@@ -99,7 +99,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "support immediate scheduling of a single vertex" in {
       val num_tasks = 133
-      val vertex = new AbstractJobVertex("Test Vertex")
+      val vertex = new JobVertex("Test Vertex")
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
@@ -134,7 +134,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "support queued scheduling of a single vertex" in {
       val num_tasks = 111
 
-      val vertex = new AbstractJobVertex("Test Vertex")
+      val vertex = new JobVertex("Test Vertex")
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
@@ -163,8 +163,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "support forward jobs" in {
       val num_tasks = 31
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[Receiver])
@@ -198,8 +198,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "support bipartite job" in {
       val num_tasks = 31
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[AgnosticReceiver])
@@ -231,9 +231,9 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "support two input job failing edge mismatch" in {
       val num_tasks = 1
-      val sender1 = new AbstractJobVertex("Sender1")
-      val sender2 = new AbstractJobVertex("Sender2")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender1 = new JobVertex("Sender1")
+      val sender2 = new JobVertex("Sender2")
+      val receiver = new JobVertex("Receiver")
 
       sender1.setInvokableClass(classOf[Sender])
       sender2.setInvokableClass(classOf[Sender])
@@ -275,9 +275,9 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "support two input job" in {
       val num_tasks = 11
-      val sender1 = new AbstractJobVertex("Sender1")
-      val sender2 = new AbstractJobVertex("Sender2")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender1 = new JobVertex("Sender1")
+      val sender2 = new JobVertex("Sender2")
+      val receiver = new JobVertex("Receiver")
 
       sender1.setInvokableClass(classOf[Sender])
       sender2.setInvokableClass(classOf[Sender])
@@ -312,9 +312,9 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "support scheduling all at once" in {
       val num_tasks = 16
-      val sender = new AbstractJobVertex("Sender")
-      val forwarder = new AbstractJobVertex("Forwarder")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val forwarder = new JobVertex("Forwarder")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       forwarder.setInvokableClass(classOf[Forwarder])
@@ -357,8 +357,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "handle job with a failing sender vertex" in {
       val num_tasks = 100
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[ExceptionSender])
       receiver.setInvokableClass(classOf[Receiver])
@@ -402,8 +402,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "handle job with an occasionally failing sender vertex" in {
       val num_tasks = 100
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[SometimesExceptionSender])
       receiver.setInvokableClass(classOf[Receiver])
@@ -449,8 +449,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "handle job with a failing receiver vertex" in {
       val num_tasks = 200
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[ExceptionReceiver])
@@ -488,8 +488,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "handle job with all vertices failing during instantiation" in {
       val num_tasks = 200
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[InstantiationErrorSender])
       receiver.setInvokableClass(classOf[Receiver])
@@ -531,8 +531,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "handle job with some vertices failing during instantiation" in {
       val num_tasks = 200
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
       receiver.setInvokableClass(classOf[Receiver])
@@ -579,7 +579,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       "completes" in {
       val num_tasks = 31
 
-      val source = new AbstractJobVertex("Source")
+      val source = new JobVertex("Source")
       val sink = new WaitingOnFinalizeJobVertex("Sink", 500)
 
       source.setInvokableClass(classOf[WaitingNoOpInvokable])
@@ -611,8 +611,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     }
   }
 
-  class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends
-  AbstractJobVertex(name){
+  class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends JobVertex(name){
     var finished = false
 
     override def finalizeOnMaster(loader: ClassLoader): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index dfc650e..766ea55 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -22,10 +22,10 @@ import akka.actor.Status.Success
 import akka.actor.{ActorRef, PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern,AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{ JobResultSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
@@ -59,8 +59,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "recover once failing forward job" in {
       FailingOnceReceiver.failed = false
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Tasks.Sender])
       receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver])
@@ -98,8 +98,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "recover once failing forward job with slot sharing" in {
       FailingOnceReceiver.failed = false
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Tasks.Sender])
       receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver])
@@ -141,8 +141,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "recover a task manager failure" in {
       BlockingOnceReceiver.blocking = true
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Tasks.Sender])
       receiver.setInvokableClass(classOf[Tasks.BlockingOnceReceiver])

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index faff2f2..48b7971 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
+import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
@@ -43,8 +43,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "support slot sharing for forward job" in {
       val num_tasks = 31
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[Receiver])
@@ -82,9 +82,9 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     "support jobs with two inputs and slot sharing" in {
       val num_tasks = 11
 
-      val sender1 = new AbstractJobVertex("Sender1")
-      val sender2 = new AbstractJobVertex("Sender2")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender1 = new JobVertex("Sender1")
+      val sender2 = new JobVertex("Sender2")
+      val receiver = new JobVertex("Receiver")
 
       sender1.setInvokableClass(classOf[Sender])
       sender2.setInvokableClass(classOf[Sender])

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index e98fd98..0fd7e1b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -22,7 +22,7 @@ import akka.actor.Status.{Failure, Success}
 import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
+import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob
@@ -46,8 +46,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     "handle gracefully failing task manager with slot sharing" in {
       val num_tasks = 100
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[BlockingReceiver])
@@ -95,8 +95,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     "handle hard failing task manager with slot sharing" in {
       val num_tasks = 20
 
-      val sender = new AbstractJobVertex("Sender")
-      val receiver = new AbstractJobVertex("Receiver")
+      val sender = new JobVertex("Sender")
+      val receiver = new JobVertex("Receiver")
 
       sender.setInvokableClass(classOf[Sender])
       receiver.setInvokableClass(classOf[BlockingReceiver])

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6a6e899..531fc71 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,7 +29,7 @@ import java.util.Map.Entry;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -55,7 +55,7 @@ public class StreamingJobGraphGenerator {
 
 	private StreamGraph streamGraph;
 
-	private Map<Integer, AbstractJobVertex> jobVertices;
+	private Map<Integer, JobVertex> jobVertices;
 	private JobGraph jobGraph;
 	private Collection<Integer> builtVertices;
 
@@ -71,7 +71,7 @@ public class StreamingJobGraphGenerator {
 	}
 
 	private void init() {
-		this.jobVertices = new HashMap<Integer, AbstractJobVertex>();
+		this.jobVertices = new HashMap<Integer, JobVertex>();
 		this.builtVertices = new HashSet<Integer>();
 		this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
 		this.vertexConfigs = new HashMap<Integer, StreamConfig>();
@@ -220,7 +220,7 @@ public class StreamingJobGraphGenerator {
 
 	private StreamConfig createProcessingVertex(Integer vertexID) {
 
-		AbstractJobVertex jobVertex = new AbstractJobVertex(chainedNames.get(vertexID));
+		JobVertex jobVertex = new JobVertex(chainedNames.get(vertexID));
 		StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
 		jobVertex.setInvokableClass(vertex.getJobVertexClass());
@@ -294,8 +294,8 @@ public class StreamingJobGraphGenerator {
 
 		Integer downStreamvertexID = edge.getTargetId();
 
-		AbstractJobVertex headVertex = jobVertices.get(headOfChain);
-		AbstractJobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
+		JobVertex headVertex = jobVertices.get(headOfChain);
+		JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
 
 		StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
 
@@ -342,7 +342,7 @@ public class StreamingJobGraphGenerator {
 
 		Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<Integer, SlotSharingGroup>();
 
-		for (Entry<Integer, AbstractJobVertex> entry : jobVertices.entrySet()) {
+		for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
 
 			int slotSharingID = streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
 
@@ -358,8 +358,8 @@ public class StreamingJobGraphGenerator {
 
 		for (StreamLoop loop : streamGraph.getStreamLoops()) {
 			CoLocationGroup ccg = new CoLocationGroup();
-			AbstractJobVertex tail = jobVertices.get(loop.getSink().getId());
-			AbstractJobVertex head = jobVertices.get(loop.getSource().getId());
+			JobVertex tail = jobVertices.get(loop.getSink().getId());
+			JobVertex head = jobVertices.get(loop.getSource().getId());
 			ccg.addVertex(head);
 			ccg.addVertex(tail);
 			tail.updateCoLocationGroup(ccg);
@@ -388,7 +388,7 @@ public class StreamingJobGraphGenerator {
 			List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
 			
 			
-			for (AbstractJobVertex vertex : jobVertices.values()) {
+			for (JobVertex vertex : jobVertices.values()) {
 				if (vertex.isInputVertex()) {
 					triggerVertices.add(vertex.getID());
 					commitVertices.add(vertex.getID());


Mime
View raw message