flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information
Date Thu, 10 Nov 2016 17:47:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master 86a3dd586 -> 58204da13


http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0eb0607..7a28b4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -56,6 +57,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(N);
 		v2.setParallelism(N);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
@@ -98,6 +102,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(2 * N);
 		v2.setParallelism(N);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
@@ -141,6 +148,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(3 * N);
 		v2.setParallelism(N);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
@@ -185,6 +195,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(N);
 		v2.setParallelism(2 * N);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
@@ -227,6 +240,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(N);
 		v2.setParallelism(7 * N);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
@@ -289,6 +305,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(lowDop);
 		v2.setParallelism(highDop);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	
@@ -342,6 +361,9 @@ public class PointwisePatternTest {
 	
 		v1.setParallelism(highDop);
 		v2.setParallelism(lowDop);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 	
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 6314969..0c95695 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -58,7 +59,13 @@ public class VertexSlotSharingTest {
 			v3.setParallelism(7);
 			v4.setParallelism(1);
 			v5.setParallelism(11);
-			
+
+			v1.setInvokableClass(AbstractInvokable.class);
+			v2.setInvokableClass(AbstractInvokable.class);
+			v3.setInvokableClass(AbstractInvokable.class);
+			v4.setInvokableClass(AbstractInvokable.class);
+			v5.setInvokableClass(AbstractInvokable.class);
+
 			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
 			v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 49f2268..f4f0d71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -27,11 +27,14 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.AbstractID;
 
 import org.apache.flink.util.SerializedValue;
@@ -41,6 +44,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collection;
 
 import static org.junit.Assert.*;
 
@@ -74,7 +78,7 @@ public class TaskManagerGroupTest extends TestLogger {
 		final ExecutionAttemptID execution13 = new ExecutionAttemptID();
 		final ExecutionAttemptID execution21 = new ExecutionAttemptID();
 
-		TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(
 			jid1, 
 			jobName1, 
 			vertex11, 
@@ -89,7 +93,7 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(), 
 			new ArrayList<URL>(), 0);
 
-		TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 			jid1,
 			jobName1,
 			vertex12,
@@ -104,7 +108,7 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(), 0);
 
-		TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd3 = createTaskDeploymentDescriptor(
 			jid2,
 			jobName2,
 			vertex21,
@@ -119,7 +123,7 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(), 0);
 
-		TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd4 = createTaskDeploymentDescriptor(
 			jid1,
 			jobName1,
 			vertex13,
@@ -134,9 +138,12 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(), 0);
 		
-		TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1);
-		TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2);
-		TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3);
+		TaskMetricGroup tmGroup11 = group.addTaskForJob(
+			jid1, jobName1, vertex11, execution11, "test", 17, 0);
+		TaskMetricGroup tmGroup12 = group.addTaskForJob(
+			jid1, jobName1, vertex12, execution12, "test", 13, 1);
+		TaskMetricGroup tmGroup21 = group.addTaskForJob(
+			jid2, jobName2, vertex21, execution21, "test", 7, 2);
 		
 		assertEquals(2, group.numRegisteredJobMetricGroups());
 		assertFalse(tmGroup11.parent().isClosed());
@@ -156,7 +163,8 @@ public class TaskManagerGroupTest extends TestLogger {
 		assertEquals(1, group.numRegisteredJobMetricGroups());
 		
 		// add one more to job one
-		TaskMetricGroup tmGroup13 = group.addTaskForJob(tdd4);
+		TaskMetricGroup tmGroup13 = group.addTaskForJob(
+			jid1, jobName1, vertex13, execution13, "test", 0, 0);
 		tmGroup12.close();
 		tmGroup13.close();
 
@@ -190,7 +198,7 @@ public class TaskManagerGroupTest extends TestLogger {
 		final ExecutionAttemptID execution12 = new ExecutionAttemptID();
 		final ExecutionAttemptID execution21 = new ExecutionAttemptID();
 
-		TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(
 			jid1,
 			jobName1,
 			vertex11,
@@ -205,7 +213,7 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(), 0);
 
-		TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 			jid1,
 			jobName1,
 			vertex12,
@@ -220,7 +228,7 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(), 0);
 
-		TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
+		TaskDeploymentDescriptor tdd3 = createTaskDeploymentDescriptor(
 			jid2,
 			jobName2,
 			vertex21,
@@ -235,9 +243,12 @@ public class TaskManagerGroupTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(), 0);
 
-		TaskMetricGroup tmGroup11 = group.addTaskForJob(tdd1);
-		TaskMetricGroup tmGroup12 = group.addTaskForJob(tdd2);
-		TaskMetricGroup tmGroup21 = group.addTaskForJob(tdd3);
+		TaskMetricGroup tmGroup11 = group.addTaskForJob(
+			jid1, jobName1, vertex11, execution11, "test", 17, 0);
+		TaskMetricGroup tmGroup12 = group.addTaskForJob(
+			jid1, jobName1, vertex12, execution12, "test", 13, 1);
+		TaskMetricGroup tmGroup21 = group.addTaskForJob(
+			jid2, jobName2, vertex21, execution21, "test", 7, 1);
 		
 		group.close();
 		
@@ -283,4 +294,56 @@ public class TaskManagerGroupTest extends TestLogger {
 		assertEquals("", info.scope);
 		assertEquals("id", info.taskManagerID);
 	}
+
+	private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+		JobID jobId,
+		String jobName,
+		JobVertexID jobVertexId,
+		ExecutionAttemptID executionAttemptId,
+		SerializedValue<ExecutionConfig> serializedExecutionConfig,
+		String taskName,
+		int numberOfKeyGroups,
+		int subtaskIndex,
+		int parallelism,
+		int attemptNumber,
+		Configuration jobConfiguration,
+		Configuration taskConfiguration,
+		String invokableClassName,
+		Collection<ResultPartitionDeploymentDescriptor> producedPartitions,
+		Collection<InputGateDeploymentDescriptor> inputGates,
+		Collection<BlobKey> requiredJarFiles,
+		Collection<URL> requiredClasspaths,
+		int targetSlotNumber) throws IOException {
+
+		JobInformation jobInformation = new JobInformation(
+			jobId,
+			jobName,
+			serializedExecutionConfig,
+			jobConfiguration,
+			requiredJarFiles,
+			requiredClasspaths);
+
+		TaskInformation taskInformation = new TaskInformation(
+			jobVertexId,
+			taskName,
+			parallelism,
+			numberOfKeyGroups,
+			invokableClassName,
+			taskConfiguration);
+
+		SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
+		SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
+
+		return new TaskDeploymentDescriptor(
+			serializedJobInformation,
+			serializedJobVertexInformation,
+			executionAttemptId,
+			subtaskIndex,
+			attemptNumber,
+			targetSlotNumber,
+			new TaskStateHandles(),
+			producedPartitions,
+			inputGates);
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0a522af..e37467b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -27,10 +27,11 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -152,19 +153,32 @@ public class TaskAsyncCallTest {
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
-				new SerializedValue<>(new ExecutionConfig()),
-				"Test Task", 1, 0, 1, 0,
-				new Configuration(), new Configuration(),
-				CheckpointsInOrderInvokable.class.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0);
-
-		return new Task(tdd,
+		JobInformation jobInformation = new JobInformation(
+			new JobID(),
+			"Job Name",
+			new SerializedValue<>(new ExecutionConfig()),
+			new Configuration(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"Test Task",
+			1,
+			1,
+			CheckpointsInOrderInvokable.class.getName(),
+			new Configuration());
+
+		return new Task(
+			jobInformation,
+			taskInformation,
+			new ExecutionAttemptID(),
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			new TaskStateHandles(),
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a203130..ad107b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -90,6 +92,7 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -170,12 +173,13 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid = new ExecutionAttemptID();
 				final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
-						"TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(),
-						TestInvokableCorrect.class.getName(),
-						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+					jid, "TestJob", vid, eid, executionConfig,
+					"TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(),
+					TestInvokableCorrect.class.getName(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+					Collections.<InputGateDeploymentDescriptor>emptyList(),
+					new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 
 				new Within(d) {
@@ -267,7 +271,7 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(
 						jid1, "TestJob1", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"TestTask1", 5, 1, 5, 0,
@@ -276,7 +280,7 @@ public class TaskManagerTest extends TestLogger {
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid2, "TestJob2", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"TestTask2", 7, 2, 7, 0,
@@ -406,13 +410,13 @@ public class TaskManagerTest extends TestLogger {
 
 				final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
+				final TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
 						"TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
+				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
 						"TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -531,7 +535,7 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
@@ -540,7 +544,7 @@ public class TaskManagerTest extends TestLogger {
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 7, 2, 7, 0,
@@ -636,7 +640,7 @@ public class TaskManagerTest extends TestLogger {
 								}
 						);
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
@@ -644,7 +648,7 @@ public class TaskManagerTest extends TestLogger {
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
 						Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 7, 2, 7, 0,
@@ -781,7 +785,7 @@ public class TaskManagerTest extends TestLogger {
 								}
 						);
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd1 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
@@ -789,7 +793,7 @@ public class TaskManagerTest extends TestLogger {
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 7, 2, 7, 0,
@@ -929,7 +933,7 @@ public class TaskManagerTest extends TestLogger {
 				final InputGateDeploymentDescriptor igdd =
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid, eid,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 1, 0, 1, 0,
@@ -1022,7 +1026,7 @@ public class TaskManagerTest extends TestLogger {
 				final InputGateDeploymentDescriptor igdd =
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid, eid,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 1, 0, 1, 0,
@@ -1096,9 +1100,11 @@ public class TaskManagerTest extends TestLogger {
 						true,
 						false);
 
+				final JobID jobId = new JobID();
+
 				// Single blocking task
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						new JobID(),
+				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+						jobId,
 						"Job",
 						new JobVertexID(),
 						new ExecutionAttemptID(),
@@ -1130,7 +1136,7 @@ public class TaskManagerTest extends TestLogger {
 
 							Future<Object> taskRunningFuture = taskManager.ask(
 									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(
-											tdd.getExecutionId()), timeout);
+											tdd.getExecutionAttemptId()), timeout);
 
 							taskManager.tell(new SubmitTask(tdd));
 
@@ -1190,7 +1196,7 @@ public class TaskManagerTest extends TestLogger {
 
 								taskManager.tell(new TriggerStackTraceSample(
 												19230,
-												tdd.getExecutionId(),
+												tdd.getExecutionAttemptId(),
 												numSamples,
 												Time.milliseconds(100L),
 												0),
@@ -1206,7 +1212,7 @@ public class TaskManagerTest extends TestLogger {
 
 								// ---- Verify response ----
 								assertEquals(19230, response.getSampleId());
-								assertEquals(tdd.getExecutionId(), response.getExecutionAttemptID());
+								assertEquals(tdd.getExecutionAttemptId(), response.getExecutionAttemptID());
 
 								List<StackTraceElement[]> traces = response.getSamples();
 
@@ -1262,7 +1268,7 @@ public class TaskManagerTest extends TestLogger {
 
 							taskManager.tell(new TriggerStackTraceSample(
 											1337,
-											tdd.getExecutionId(),
+											tdd.getExecutionAttemptId(),
 											numSamples,
 											Time.milliseconds(100L),
 											maxDepth),
@@ -1278,7 +1284,7 @@ public class TaskManagerTest extends TestLogger {
 
 							// ---- Verify response ----
 							assertEquals(1337, response.getSampleId());
-							assertEquals(tdd.getExecutionId(), response.getExecutionAttemptID());
+							assertEquals(tdd.getExecutionAttemptId(), response.getExecutionAttemptID());
 
 							List<StackTraceElement[]> traces = response.getSamples();
 
@@ -1309,7 +1315,7 @@ public class TaskManagerTest extends TestLogger {
 								taskManager.tell(
 									new TriggerStackTraceSample(
 										44,
-										tdd.getExecutionId(),
+										tdd.getExecutionAttemptId(),
 										Integer.MAX_VALUE,
 										Time.milliseconds(10L),
 										0),
@@ -1318,11 +1324,11 @@ public class TaskManagerTest extends TestLogger {
 								Thread.sleep(sleepTime);
 
 								Future<?> removeFuture = taskManager.ask(
-										new TestingJobManagerMessages.NotifyWhenJobRemoved(tdd.getJobID()),
+										new TestingJobManagerMessages.NotifyWhenJobRemoved(jobId),
 										remaining());
 
 								// Cancel the task
-								taskManager.tell(new CancelTask(tdd.getExecutionId()));
+								taskManager.tell(new CancelTask(tdd.getExecutionAttemptId()));
 
 								// Receive the expected message (heartbeat races possible)
 								while (true) {
@@ -1330,7 +1336,7 @@ public class TaskManagerTest extends TestLogger {
 									if (msg[0] instanceof StackTraceSampleResponse) {
 										StackTraceSampleResponse response = (StackTraceSampleResponse) msg[0];
 
-										assertEquals(tdd.getExecutionId(), response.getExecutionAttemptID());
+										assertEquals(tdd.getExecutionAttemptId(), response.getExecutionAttemptID());
 										assertEquals(44, response.getSampleId());
 
 										// Done
@@ -1341,7 +1347,7 @@ public class TaskManagerTest extends TestLogger {
 
 										Future<?> taskRunningFuture = taskManager.ask(
 												new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(
-														tdd.getExecutionId()), timeout);
+														tdd.getExecutionAttemptId()), timeout);
 
 										// Resubmit
 										taskManager.tell(new SubmitTask(tdd));
@@ -1425,7 +1431,7 @@ public class TaskManagerTest extends TestLogger {
 				false // don't deploy eagerly but with the first completed memory buffer
 			);
 
-			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
+			final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
 				"TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(),
 				TestInvokableRecordCancel.class.getName(),
 				Collections.singletonList(resultPartitionDeploymentDescriptor),
@@ -1711,4 +1717,56 @@ public class TaskManagerTest extends TestLogger {
 			}
 		}
 	}
+
+	private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+		JobID jobId,
+		String jobName,
+		JobVertexID jobVertexId,
+		ExecutionAttemptID executionAttemptId,
+		SerializedValue<ExecutionConfig> serializedExecutionConfig,
+		String taskName,
+		int numberOfKeyGroups,
+		int subtaskIndex,
+		int parallelism,
+		int attemptNumber,
+		Configuration jobConfiguration,
+		Configuration taskConfiguration,
+		String invokableClassName,
+		Collection<ResultPartitionDeploymentDescriptor> producedPartitions,
+		Collection<InputGateDeploymentDescriptor> inputGates,
+		Collection<BlobKey> requiredJarFiles,
+		Collection<URL> requiredClasspaths,
+		int targetSlotNumber) throws IOException {
+
+		JobInformation jobInformation = new JobInformation(
+			jobId,
+			jobName,
+			serializedExecutionConfig,
+			jobConfiguration,
+			requiredJarFiles,
+			requiredClasspaths);
+
+		TaskInformation taskInformation = new TaskInformation(
+			jobVertexId,
+			taskName,
+			parallelism,
+			numberOfKeyGroups,
+			invokableClassName,
+			taskConfiguration);
+
+		SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
+		SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
+
+		return new TaskDeploymentDescriptor(
+			serializedJobInformation,
+			serializedJobVertexInformation,
+			executionAttemptId,
+			subtaskIndex,
+			attemptNumber,
+			targetSlotNumber,
+			null,
+			producedPartitions,
+			inputGates);
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 754361d..276e090 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -32,11 +37,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.TaskStateHandles;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -44,6 +48,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.concurrent.Executor;
 
 import static org.mockito.Mockito.mock;
@@ -59,18 +64,22 @@ public class TaskStopTest {
 		TaskInfo taskInfoMock = mock(TaskInfo.class);
 		when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
 
-		TaskDeploymentDescriptor tddMock = mock(TaskDeploymentDescriptor.class);
-		when(tddMock.getTaskInfo()).thenReturn(taskInfoMock);
-		when(tddMock.getJobID()).thenReturn(mock(JobID.class));
-		when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
-		when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
-		when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
-		when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
-		when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
-		when(tddMock.getInvokableClassName()).thenReturn("className");
-
 		task = new Task(
-			tddMock,
+			mock(JobInformation.class),
+			new TaskInformation(
+				new JobVertexID(),
+				"test task name",
+				1,
+				1,
+				"foobar",
+				new Configuration()),
+			mock(ExecutionAttemptID.class),
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			mock(TaskStateHandles.class),
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			mock(NetworkEnvironment.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 382e5c1..1eebe12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -27,11 +27,12 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -718,17 +719,17 @@ public class TaskTest extends TestLogger {
 		}
 	}
 
-	private Task createTask(Class<? extends AbstractInvokable> invokable) {
+	private Task createTask(Class<? extends AbstractInvokable> invokable) throws IOException {
 		return createTask(invokable, new Configuration(), new ExecutionConfig());
 	}
 
-	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) {
+	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 		return createTask(invokable, libCache, config, new ExecutionConfig());
 	}
 
-	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) {
+	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 		return createTask(invokable, libCache, config, execConfig);
@@ -736,7 +737,7 @@ public class TaskTest extends TestLogger {
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
-			LibraryCacheManager libCache) {
+			LibraryCacheManager libCache) throws IOException {
 
 		return createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
 	}
@@ -745,7 +746,7 @@ public class TaskTest extends TestLogger {
 			Class<? extends AbstractInvokable> invokable,
 			LibraryCacheManager libCache,
 			Configuration config,
-			ExecutionConfig execConfig) {
+			ExecutionConfig execConfig) throws IOException {
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
@@ -766,7 +767,7 @@ public class TaskTest extends TestLogger {
 			NetworkEnvironment networkEnvironment,
 			ResultPartitionConsumableNotifier consumableNotifier,
 			PartitionStateChecker partitionStateChecker,
-			Executor executor) {
+			Executor executor) throws IOException {
 		return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionStateChecker, executor, new Configuration(), new ExecutionConfig());
 	}
 	
@@ -777,22 +778,50 @@ public class TaskTest extends TestLogger {
 		ResultPartitionConsumableNotifier consumableNotifier,
 		PartitionStateChecker partitionStateChecker,
 		Executor executor,
-		Configuration config,
-		ExecutionConfig execConfig) {
+		Configuration taskConfig,
+		ExecutionConfig execConfig) throws IOException {
 		
-		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable, config, execConfig);
+		JobID jobId = new JobID();
+		JobVertexID jobVertexId = new JobVertexID();
+		ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
 
 		InputSplitProvider inputSplitProvider = new TaskInputSplitProvider(
 			jobManagerGateway,
-			tdd.getJobID(),
-			tdd.getVertexID(),
-			tdd.getExecutionId(),
+			jobId,
+			jobVertexId,
+			executionAttemptId,
 			new FiniteDuration(60, TimeUnit.SECONDS));
 
 		CheckpointResponder checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
+
+		SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(execConfig);
+
+		JobInformation jobInformation = new JobInformation(
+			jobId,
+			"Test Job",
+			serializedExecutionConfig,
+			new Configuration(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+			jobVertexId,
+			"Test Task",
+			1,
+			1,
+			invokable.getName(),
+			taskConfig);
 		
 		return new Task(
-			tdd,
+			jobInformation,
+			taskInformation,
+			executionAttemptId,
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			null,
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,
@@ -809,31 +838,6 @@ public class TaskTest extends TestLogger {
 			executor);
 	}
 
-	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
-			Class<? extends AbstractInvokable> invokable,
-			Configuration taskConfig,
-			ExecutionConfig execConfig) {
-
-		SerializedValue<ExecutionConfig> serializedExecConfig;
-		try {
-			serializedExecConfig = new SerializedValue<>(execConfig);
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		return new TaskDeploymentDescriptor(
-				new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(),
-				serializedExecConfig,
-				"Test Task", 1, 0, 1, 0,
-				new Configuration(), taskConfig,
-				invokable.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0);
-	}
-
 	// ------------------------------------------------------------------------
 	// Validation Methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
index d880dc0..9db11d7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -138,11 +138,19 @@ trait TestingTaskManagerLike extends FlinkActor {
       registeredSubmitTaskListeners.put(jobId, sender())
 
     case msg@SubmitTask(tdd) =>
-      registeredSubmitTaskListeners.get(tdd.getJobID) match {
-        case Some(listenerRef) =>
-          listenerRef ! ResponseSubmitTaskListener(tdd)
-        case None =>
-        // Nothing to do
+      try {
+        val jobId = tdd.getSerializedJobInformation.deserializeValue(getClass.getClassLoader)
+          .getJobId
+
+        registeredSubmitTaskListeners.get(jobId) match {
+          case Some(listenerRef) =>
+            listenerRef ! ResponseSubmitTaskListener(tdd)
+          case None =>
+          // Nothing to do
+        }
+      } catch {
+        case e: Exception =>
+          log.error("Could not deserialize the job information.", e)
       }
 
       super.handleMessage(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index ee5a203..1f79384 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -27,10 +27,11 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -96,8 +97,7 @@ public class InterruptSensitiveRestoreTest {
 
 		StreamStateHandle lockingHandle = new InterruptLockingStateHandle();
 
-		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, lockingHandle);
-		Task task = createTask(tdd);
+		Task task = createTask(taskConfig, lockingHandle);
 
 		// start the task and wait until it is in "restore"
 		task.startTaskThread();
@@ -120,10 +120,14 @@ public class InterruptSensitiveRestoreTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+	private static Task createTask(
 			Configuration taskConfig,
 			StreamStateHandle state) throws IOException {
 
+		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
+				.thenReturn(mock(TaskKvStateRegistry.class));
+
 		ChainedStateHandle<StreamStateHandle> operatorState = new ChainedStateHandle<>(Collections.singletonList(state));
 		List<KeyGroupsStateHandle> keyGroupStateFromBackend = Collections.emptyList();
 		List<KeyGroupsStateHandle> keyGroupStateFromStream = Collections.emptyList();
@@ -131,53 +135,53 @@ public class InterruptSensitiveRestoreTest {
 		List<Collection<OperatorStateHandle>> operatorStateStream = Collections.emptyList();
 
 		TaskStateHandles taskStateHandles = new TaskStateHandles(
-				operatorState,
-				operatorStateBackend,
-				operatorStateStream,
-				keyGroupStateFromBackend,
-				keyGroupStateFromStream);
-
-		return new TaskDeploymentDescriptor(
-				new JobID(),
-				"test job name",
-				new JobVertexID(),
-				new ExecutionAttemptID(),
-				new SerializedValue<>(new ExecutionConfig()),
-				"test task name",
-				1, 0, 1, 0,
-				new Configuration(),
-				taskConfig,
-				SourceStreamTask.class.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0,
-				taskStateHandles);
-	}
-
-	private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException {
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
-				.thenReturn(mock(TaskKvStateRegistry.class));
+			operatorState,
+			operatorStateBackend,
+			operatorStateStream,
+			keyGroupStateFromBackend,
+			keyGroupStateFromStream);
+
+		JobInformation jobInformation = new JobInformation(
+			new JobID(),
+			"test job name",
+			new SerializedValue<>(new ExecutionConfig()),
+			new Configuration(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"test task name",
+			1,
+			1,
+			SourceStreamTask.class.getName(),
+			taskConfig);
 
 		return new Task(
-				tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				networkEnvironment,
-				mock(BroadcastVariableManager.class),
-				mock(TaskManagerConnection.class),
-				mock(InputSplitProvider.class),
-				mock(CheckpointResponder.class),
-				new FallbackLibraryCacheManager(),
-				new FileCache(new Configuration()),
-				new TaskManagerRuntimeInfo(
-						"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
-				new UnregisteredTaskMetricsGroup(),
-				mock(ResultPartitionConsumableNotifier.class),
-				mock(PartitionStateChecker.class),
-				mock(Executor.class));
+			jobInformation,
+			taskInformation,
+			new ExecutionAttemptID(),
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			taskStateHandles,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			networkEnvironment,
+			mock(BroadcastVariableManager.class),
+			mock(TaskManagerConnection.class),
+			mock(InputSplitProvider.class),
+			mock(CheckpointResponder.class),
+			new FallbackLibraryCacheManager(),
+			new FileCache(new Configuration()),
+			new TaskManagerRuntimeInfo(
+					"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+			new UnregisteredTaskMetricsGroup(),
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(PartitionStateChecker.class),
+			mock(Executor.class));
 
 	}
 
@@ -274,4 +278,4 @@ public class InterruptSensitiveRestoreTest {
 			fail("should never be called");
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 73316eb..3fe8a37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -30,10 +30,11 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -48,6 +49,7 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -263,35 +265,46 @@ public class StreamTaskTest {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
-				new SerializedValue<>(new ExecutionConfig()),
-				"Test Task", 1, 0, 1, 0,
-				new Configuration(),
-				taskConfig.getConfiguration(),
-				invokable.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0);
+		JobInformation jobInformation = new JobInformation(
+			new JobID(),
+			"Job Name",
+			new SerializedValue<>(new ExecutionConfig()),
+			new Configuration(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"Test Task",
+			1,
+			1,
+			invokable.getName(),
+			taskConfig.getConfiguration());
 
 		return new Task(
-				tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				network,
-				mock(BroadcastVariableManager.class),
-				mock(TaskManagerConnection.class),
-				mock(InputSplitProvider.class),
-				mock(CheckpointResponder.class),
-				libCache,
-				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
-				new UnregisteredTaskMetricsGroup(),
-				consumableNotifier,
-				partitionStateChecker,
-				executor);
+			jobInformation,
+			taskInformation,
+			new ExecutionAttemptID(),
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			new TaskStateHandles(),
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			network,
+			mock(BroadcastVariableManager.class),
+			mock(TaskManagerConnection.class),
+			mock(InputSplitProvider.class),
+			mock(CheckpointResponder.class),
+			libCache,
+			mock(FileCache.class),
+			new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+			new UnregisteredTaskMetricsGroup(),
+			consumableNotifier,
+			partitionStateChecker,
+			executor);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 47a0828..ab9c1fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -310,7 +311,11 @@ public class SavepointITCase extends TestLogger {
 
 								LOG.info("Received: " + tdd.toString() + ".");
 
-								tdds.put(tdd.getVertexID(), tdd);
+								TaskInformation taskInformation = tdd
+									.getSerializedTaskInformation()
+									.deserializeValue(getClass().getClassLoader());
+
+								tdds.put(taskInformation.getJobVertexId(), tdd);
 							}
 						}
 						catch (Throwable t) {
@@ -337,7 +342,7 @@ public class SavepointITCase extends TestLogger {
 				assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
 
 				for (TaskDeploymentDescriptor tdd : taskTdds) {
-					SubtaskState subtaskState = taskState.getState(tdd.getIndexInSubtaskGroup());
+					SubtaskState subtaskState = taskState.getState(tdd.getSubtaskIndex());
 
 					assertNotNull(subtaskState);
 


Mime
View raw message