flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-3327] ExecutionConfig to JobGraph
Date Fri, 11 Mar 2016 23:12:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master d0a390f9b -> 0f8d76c6f


http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index adaff29..1cd01ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -105,7 +106,7 @@ public class JobSubmitTest {
 			// create a simple job graph
 			JobVertex jobVertex = new JobVertex("Test Vertex");
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-			JobGraph jg = new JobGraph("test job", jobVertex);
+			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex);
 
 			// request the blob port from the job manager
 			Future<Object> future = jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
@@ -169,7 +170,7 @@ public class JobSubmitTest {
 			};
 
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-			JobGraph jg = new JobGraph("test job", jobVertex);
+			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex);
 
 			// submit the job
 			Future<Object> submitFuture = jmGateway.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 1733406..dfb0b91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -103,7 +104,7 @@ public class SlotCountExceedingParallelismTest {
 				DistributionPattern.ALL_TO_ALL,
 				ResultPartitionType.BLOCKING);
 
-		final JobGraph jobGraph = new JobGraph(jobName, sender, receiver);
+		final JobGraph jobGraph = new JobGraph(jobName, new ExecutionConfig(), sender, receiver);
 
 		// We need to allow queued scheduling, because there are not enough slots available
 		// to run all tasks at once. We queue tasks and then let them finish/consume the blocking

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 753e7be..ca2ecf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -37,7 +38,7 @@ public class StandaloneSubmittedJobGraphStoreTest {
 		StandaloneSubmittedJobGraphStore jobGraphs = new StandaloneSubmittedJobGraphStore();
 
 		SubmittedJobGraph jobGraph = new SubmittedJobGraph(
-				new JobGraph("testNoOps"),
+				new JobGraph("testNoOps", new ExecutionConfig()),
 				new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
 		assertEquals(0, jobGraphs.recoverJobGraphs().size());

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 356ba36..5e53596 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -260,7 +261,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 	// ---------------------------------------------------------------------------------------------
 
 	private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long start) {
-		final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph");
+		final JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph", new ExecutionConfig());
 
 		final JobVertex jobVertex = new JobVertex("Test JobVertex");
 		jobVertex.setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index eb4d96f..07fc2c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import com.google.common.collect.Lists;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -112,6 +113,7 @@ public class ScheduleOrUpdateConsumersTest {
 
 		final JobGraph jobGraph = new JobGraph(
 				"Mixed pipelined and blocking result",
+				new ExecutionConfig(),
 				sender,
 				pipelinedReceiver,
 				blockingReceiver);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index c490a64..f14d62f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -268,6 +269,6 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		return new JobGraph("Blocking test job", sender, receiver);
+		return new JobGraph("Blocking test job", new ExecutionConfig(), sender, receiver);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 3fcc425..233dace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -44,12 +45,18 @@ public class DummyEnvironment implements Environment {
 	private final TaskInfo taskInfo;
 	private final JobID jobId = new JobID();
 	private final JobVertexID jobVertexId = new JobVertexID();
+	private final ExecutionConfig executionConfig = new ExecutionConfig();
 
 	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
 		this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0);
 	}
 
 	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
+	@Override
 	public JobID getJobID() {
 		return jobId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index fa97210..d29b206 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -65,6 +66,8 @@ public class MockEnvironment implements Environment {
 	
 	private final TaskInfo taskInfo;
 	
+	private final ExecutionConfig executionConfig;
+
 	private final MemoryManager memManager;
 
 	private final IOManager ioManager;
@@ -96,6 +99,7 @@ public class MockEnvironment implements Environment {
 
 		this.memManager = new MemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
+		this.executionConfig = new ExecutionConfig();
 		this.inputSplitProvider = inputSplitProvider;
 		this.bufferSize = bufferSize;
 
@@ -186,6 +190,11 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return this.executionConfig;
+	}
+
+	@Override
 	public JobID getJobID() {
 		return this.jobID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e5ff7b1..9d33920 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -148,7 +149,7 @@ public class TaskAsyncCallTest {
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
-				"Test Task", 0, 1, 0,
+				new ExecutionConfig(), "Test Task", 0, 1, 0,
 				new Configuration(), new Configuration(),
 				CheckpointsInOrderInvokable.class.getName(),
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index a3d1883..e7f4c5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -73,7 +74,7 @@ public class TaskCancelTest {
 			flink.start();
 
 			// Setup
-			final JobGraph jobGraph = new JobGraph("Cancel Big Union");
+			final JobGraph jobGraph = new JobGraph("Cancel Big Union", new ExecutionConfig());
 
 			JobVertex[] sources = new JobVertex[numberOfSources];
 			SlotSharingGroup group = new SlotSharingGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index b80cb0e..56c2e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -157,9 +158,11 @@ public class TaskManagerTest extends TestLogger {
 				final JobID jid = new JobID();
 				final JobVertexID vid = new JobVertexID();
 				final ExecutionAttemptID eid = new ExecutionAttemptID();
+				final ExecutionConfig executionConfig = new ExecutionConfig();
 
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, 0,
-						new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
+				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, executionConfig,
+						"TestTask", 2, 7, 0, new Configuration(), new Configuration(),
+						TestInvokableCorrect.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
@@ -258,13 +261,15 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1,
+						new ExecutionConfig(), "TestTask1", 1, 5, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2,
+						new ExecutionConfig(), "TestTask2", 2, 7, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -390,14 +395,14 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0,
-						new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, new ExecutionConfig(),
+						"TestTask1", 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0,
-						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, new ExecutionConfig(),
+						"TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
@@ -516,13 +521,15 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1,
+						new ExecutionConfig(), "Sender", 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2,
+						new ExecutionConfig(), "Receiver", 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -615,12 +622,14 @@ public class TaskManagerTest extends TestLogger {
 								}
 						);
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1,
+						new ExecutionConfig(), "Sender", 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
 						Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2,
+						new ExecutionConfig(), "Receiver", 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
@@ -754,12 +763,14 @@ public class TaskManagerTest extends TestLogger {
 								}
 						);
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1,
+						new ExecutionConfig(), "Sender", 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2,
+						new ExecutionConfig(), "Receiver", 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
@@ -897,7 +908,8 @@ public class TaskManagerTest extends TestLogger {
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						jid, vid, eid, "Receiver", 0, 1, 0,
+						jid, vid, eid,
+						new ExecutionConfig(), "Receiver", 0, 1, 0,
 						new Configuration(), new Configuration(),
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -991,7 +1003,7 @@ public class TaskManagerTest extends TestLogger {
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						jid, vid, eid, "Receiver", 0, 1, 0,
+						jid, vid, eid, new ExecutionConfig(), "Receiver", 0, 1, 0,
 						new Configuration(), new Configuration(),
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1067,6 +1079,7 @@ public class TaskManagerTest extends TestLogger {
 						new JobID(),
 						new JobVertexID(),
 						new ExecutionAttemptID(),
+						new ExecutionConfig(),
 						"Task",
 						0,
 						1,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index a60e074..aa37d47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -61,6 +62,7 @@ public class TaskStopTest {
 		when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
 		when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
 		when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+		when(tddMock.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
 		when(tddMock.getInvokableClassName()).thenReturn("className");
 
 		task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 45ca364..034681e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -627,7 +628,7 @@ public class TaskTest {
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
 		return new TaskDeploymentDescriptor(
 				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
-				"Test Task", 0, 1, 0,
+				new ExecutionConfig(), "Test Task", 0, 1, 0,
 				new Configuration(), new Configuration(),
 				invokable.getName(),
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index dbd87d0..ef3dae4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph
 
-import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
@@ -50,13 +50,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
         sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
         sender.setParallelism(20)
 
-        val jobGraph = new JobGraph("Pointwise job", sender)
+        val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender)
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),
+          new ExecutionConfig,
           AkkaUtils.getDefaultTimeout,
           new NoRestartStrategy())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index a0c144a..f52d37e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
@@ -67,7 +68,7 @@ class CoLocationConstraintITCase(_system: ActorSystem)
 
       receiver.setStrictlyCoLocatedWith(sender)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index ec54b7e..894ba38 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
-import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, SavepointCoordinator}
 import org.apache.flink.runtime.client.JobExecutionException
@@ -68,7 +68,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(2)
       vertex.setInvokableClass(classOf[BlockingNoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", vertex)
+      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -110,7 +110,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", vertex)
+      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -145,7 +145,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(num_tasks)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test job", vertex)
+      val jobGraph = new JobGraph("Test job", new ExecutionConfig(), vertex)
       jobGraph.setAllowQueuedScheduling(true)
 
       val cluster = TestingUtils.startTestingCluster(10)
@@ -181,7 +181,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -216,7 +216,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
+      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -253,7 +253,8 @@ class JobManagerITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(),
+        sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -298,7 +299,8 @@ class JobManagerITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      val jobGraph = new JobGraph("Bipartite Job", new ExecutionConfig(),
+        sender1, receiver, sender2)
 
       val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -340,7 +342,8 @@ class JobManagerITCase(_system: ActorSystem)
       forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL)
       receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)
+      val jobGraph = new JobGraph("Forwarding Job", new ExecutionConfig(),
+        sender, forwarder, receiver)
 
       jobGraph.setScheduleMode(ScheduleMode.ALL)
 
@@ -376,7 +379,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -424,7 +427,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -469,7 +472,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -509,7 +512,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -557,7 +560,7 @@ class JobManagerITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -600,7 +603,8 @@ class JobManagerITCase(_system: ActorSystem)
       source.setParallelism(num_tasks)
       sink.setParallelism(num_tasks)
 
-      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, sink)
+      val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition",
+        new ExecutionConfig(), source, sink)
 
       val cluster = TestingUtils.startTestingCluster(2*num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -626,12 +630,12 @@ class JobManagerITCase(_system: ActorSystem)
       val vertex = new JobVertex("Test Vertex")
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph1 = new JobGraph("Test Job", vertex)
+      val jobGraph1 = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 2000)
       slowVertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph2 = new JobGraph("Long running Job", slowVertex)
+      val jobGraph2 = new JobGraph("Long running Job", new ExecutionConfig(), slowVertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jm = cluster.getLeaderGateway(1 seconds)
@@ -680,7 +684,7 @@ class JobManagerITCase(_system: ActorSystem)
       vertex.setParallelism(1)
       vertex.setInvokableClass(classOf[NoOpInvokable])
 
-      val jobGraph = new JobGraph("Test Job", vertex)
+      val jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex)
 
       val cluster = TestingUtils.startTestingCluster(1)
       val jm = cluster.getLeaderGateway(1 seconds)
@@ -778,7 +782,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
 
           // Submit job w/o checkpointing configured
           jobManager.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), testActor)
@@ -811,7 +815,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -864,7 +868,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),
@@ -922,7 +926,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           val jobVertex = new JobVertex("Blocking vertex")
           jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
-          val jobGraph = new JobGraph(jobVertex)
+          val jobGraph = new JobGraph(new ExecutionConfig(), jobVertex)
           jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
             java.util.Collections.emptyList(),
             java.util.Collections.emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 41b6702..ea42cd1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.{PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex}
@@ -82,7 +83,10 @@ class RecoveryITCase(_system: ActorSystem)
 
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val executionConfig = new ExecutionConfig()
+      executionConfig.setNumberOfExecutionRetries(1);
+
+      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver)
 
       val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s")
       cluster.start()
@@ -126,7 +130,10 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val executionConfig = new ExecutionConfig()
+      executionConfig.setNumberOfExecutionRetries(1);
+
+      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver)
 
       val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
       cluster.start()
@@ -170,7 +177,10 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      val executionConfig = new ExecutionConfig()
+      executionConfig.setNumberOfExecutionRetries(1);
+
+      val jobGraph = new JobGraph("Pointwise job", executionConfig, sender, receiver)
 
       val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
       cluster.start()

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index a6d60dd..4d320ea 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
@@ -65,7 +66,7 @@ class SlotSharingITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -109,7 +110,8 @@ class SlotSharingITCase(_system: ActorSystem)
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
       receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
 
-      val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
+      val jobGraph = new JobGraph("Bipartite job", new ExecutionConfig(),
+        sender1, sender2, receiver)
 
       val cluster = TestingUtils.startTestingCluster(num_tasks)
       val jmGateway = cluster.getLeaderGateway(1 seconds)

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 49a1c95..c108596 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
@@ -28,7 +29,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -67,7 +67,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
@@ -116,7 +116,7 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
       sender.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e3e1ac6..c339a07 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -24,7 +24,6 @@ import com.google.common.hash.Hashing;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -52,12 +51,10 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -112,7 +109,7 @@ public class StreamingJobGraphGenerator {
 	}
 
 	public JobGraph createJobGraph() {
-		jobGraph = new JobGraph(streamGraph.getJobName());
+		jobGraph = new JobGraph(streamGraph.getJobName(), streamGraph.getExecutionConfig());
 
 		// make sure that all vertices start immediately
 		jobGraph.setScheduleMode(ScheduleMode.ALL);
@@ -133,12 +130,6 @@ public class StreamingJobGraphGenerator {
 
 		configureRestartStrategy();
 
-		try {
-			InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
-		} catch (IOException e) {
-			throw new RuntimeException("Config object could not be written to Job Configuration: ", e);
-		}
-		
 		return jobGraph;
 	}
 
@@ -494,7 +485,8 @@ public class StreamingJobGraphGenerator {
 	}
 
 	private void configureRestartStrategy() {
-		jobGraph.setRestartStrategyConfiguration(streamGraph.getExecutionConfig().getRestartStrategy());
+		jobGraph.getExecutionConfig().setRestartStrategy(
+			streamGraph.getExecutionConfig().getRestartStrategy());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index 9f75727..0517576 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -42,7 +42,7 @@ public class RestartStrategyTest {
 		StreamGraph graph = env.getStreamGraph();
 		JobGraph jobGraph = graph.getJobGraph();
 
-		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration();
+		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
@@ -64,7 +64,7 @@ public class RestartStrategyTest {
 		StreamGraph graph = env.getStreamGraph();
 		JobGraph jobGraph = graph.getJobGraph();
 
-		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration();
+		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration);
@@ -86,7 +86,7 @@ public class RestartStrategyTest {
 		StreamGraph graph = env.getStreamGraph();
 		JobGraph jobGraph = graph.getJobGraph();
 
-		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration();
+		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getExecutionConfig().getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 8a814ff..e9aec48 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -77,10 +77,16 @@ public class StreamingJobGraphGeneratorTest {
 		config.setParallelism(dop);
 		
 		JobGraph jobGraph = compiler.createJobGraph();
-		
+
+		final String EXEC_CONFIG_KEY = "runtime.config";
+
+		InstantiationUtil.writeObjectToConfig(jobGraph.getExecutionConfig(),
+			jobGraph.getJobConfiguration(),
+			EXEC_CONFIG_KEY);
+
 		ExecutionConfig executionConfig = InstantiationUtil.readObjectFromConfig(
 				jobGraph.getJobConfiguration(),
-				ExecutionConfig.CONFIG_KEY,
+				EXEC_CONFIG_KEY,
 				Thread.currentThread().getContextClassLoader());
 		
 		assertNotNull(executionConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 184f87e..732d3e5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -134,6 +135,7 @@ public class RescalePartitionerTest extends TestLogger {
 			jobId,
 			jobName,
 			cfg,
+			new ExecutionConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new ArrayList<BlobKey>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index c4b74e8..f91353e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -90,8 +91,10 @@ public class StreamMockEnvironment implements Environment {
 
 	private final int bufferSize;
 
-	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
-									MockInputSplitProvider inputSplitProvider, int bufferSize) {
+	private final ExecutionConfig executionConfig;
+
+	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
+									long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.taskInfo = new TaskInfo("", 0, 1, 0);
 		this.jobConfiguration = jobConfig;
 		this.taskConfiguration = taskConfig;
@@ -103,9 +106,15 @@ public class StreamMockEnvironment implements Environment {
 		this.inputSplitProvider = inputSplitProvider;
 		this.bufferSize = bufferSize;
 
+		this.executionConfig = executionConfig;
 		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
 	}
 
+	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
+								 MockInputSplitProvider inputSplitProvider, int bufferSize) {
+		this(jobConfig, taskConfig, null, memorySize, inputSplitProvider, bufferSize);
+	}
+
 	public void addInputGate(InputGate gate) {
 		inputs.add(gate);
 	}
@@ -206,6 +215,11 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return this.executionConfig;
+	}
+
+	@Override
 	public JobID getJobID() {
 		return this.jobID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index 1830054..ed1dd60 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -77,6 +78,11 @@ public class StreamTaskAsyncCheckpointTest {
 			testHarness.bufferSize) {
 
 			@Override
+			public ExecutionConfig getExecutionConfig() {
+				return testHarness.executionConfig;
+			}
+
+			@Override
 			public void acknowledgeCheckpoint(long checkpointId) {
 				super.acknowledgeCheckpoint(checkpointId);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7cc58b5..7f4492a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import akka.actor.ActorRef;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -134,7 +135,7 @@ public class StreamTaskTest {
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
-				"Test Task", 0, 1, 0,
+				new ExecutionConfig(), "Test Task", 0, 1, 0,
 				new Configuration(),
 				taskConfig.getConfiguration(),
 				invokable.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 8dc7edd..e750f6f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
 import java.io.IOException;
@@ -104,12 +103,6 @@ public class StreamTaskTestHarness<OUT> {
 		this.jobConfig = new Configuration();
 		this.taskConfig = new Configuration();
 		this.executionConfig = new ExecutionConfig();
-		
-		try {
-			InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY);
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
 
 		streamConfig = new StreamConfig(taskConfig);
 		streamConfig.setChainStart();
@@ -156,7 +149,8 @@ public class StreamTaskTestHarness<OUT> {
 	 * Task thread to finish running.
 	 */
 	public void invoke() throws Exception {
-		mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+		mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig,
+			memorySize, new MockInputSplitProvider(), bufferSize);
 		task.setEnvironment(mockEnv);
 
 		initializeInputs();

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..f6c22d4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.failingPrograms;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -64,7 +65,7 @@ public class JobSubmissionFailsITCase {
 			
 			final JobVertex jobVertex = new JobVertex("Working job vertex.");
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-			workingJobGraph = new JobGraph("Working testing job", jobVertex);
+			workingJobGraph = new JobGraph("Working testing job", new ExecutionConfig(), jobVertex);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -115,7 +116,7 @@ public class JobSubmissionFailsITCase {
 			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
 			failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 
-			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
+			final JobGraph failingJobGraph = new JobGraph("Failing testing job", new ExecutionConfig(), failingJobVertex);
 
 			try {
 				submitJob(failingJobGraph);
@@ -140,7 +141,7 @@ public class JobSubmissionFailsITCase {
 	@Test
 	public void testSubmitEmptyJobGraph() {
 		try {
-			final JobGraph jobGraph = new JobGraph("Testing job");
+			final JobGraph jobGraph = new JobGraph("Testing job", new ExecutionConfig());
 	
 			try {
 				submitJob(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index e5a494b..c1bd5e2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -68,6 +68,39 @@ public class MapITCase extends MultipleProgramsTestBase {
 		compareResultAsText(result, expected);
 	}
 
+	@Test
+	public void testRuntimeContextAndExecutionConfigParams() throws Exception {
+		/*
+		 * Test identity map with basic type
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setNumberOfExecutionRetries(1000);
+
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		DataSet<String> identityMapDs = ds.
+			map(new RichMapFunction<String, String>() {
+				@Override
+				public String map(String value) throws Exception {
+					Assert.assertTrue(1000 == getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries());
+					return value;
+				}
+			});
+
+		List<String> result = identityMapDs.collect();
+
+		String expected = "Hi\n" +
+			"Hello\n" +
+			"Hello world\n" +
+			"Hello world, how are you?\n" +
+			"I am fine.\n" +
+			"Luke Skywalker\n" +
+			"Random comment\n" +
+			"LOL\n";
+
+		compareResultAsText(result, expected);
+	}
+
 	public static class Mapper1 implements MapFunction<String, String> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 5783fcc..e8ad527 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.recovery;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -366,7 +367,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			// BLocking JobGraph
 			JobVertex blockingVertex = new JobVertex("Blocking vertex");
 			blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
-			JobGraph jobGraph = new JobGraph(blockingVertex);
+			JobGraph jobGraph = new JobGraph(new ExecutionConfig(), blockingVertex);
 
 			// Submit the job in detached mode
 			leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 5080aa2..32423be 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -25,6 +25,7 @@ import akka.actor.UntypedActor;
 import akka.testkit.TestActorRef;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -428,7 +429,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	 * Creates a simple blocking JobGraph.
 	 */
 	private static JobGraph createBlockingJobGraph() {
-		JobGraph jobGraph = new JobGraph("Blocking program");
+		JobGraph jobGraph = new JobGraph("Blocking program", new ExecutionConfig());
 
 		JobVertex jobVertex = new JobVertex("Blocking Vertex");
 		jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index aada364..a870578 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -96,7 +97,7 @@ public class NetworkStackThroughputITCase {
 
 		private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
 										boolean isSlowReceiver, int numSubtasks) {
-			JobGraph jobGraph = new JobGraph("Speed Test");
+			JobGraph jobGraph = new JobGraph("Speed Test", new ExecutionConfig());
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
 			JobVertex producer = new JobVertex("Speed Test Producer");

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 45ee839..09f9cac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -167,7 +168,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
+		final JobGraph graph = new JobGraph("Blocking test job", new ExecutionConfig(), sender, receiver);
 
 		final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 2586a27..48888a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import org.apache.commons.io.FileUtils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -178,7 +179,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 		sender.setParallelism(2);
 		sender.setInvokableClass(StoppableInvokable.class);
 
-		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender);
 		final JobID jid = jobGraph.getJobID();
 
 		cluster.submitJobDetached(jobGraph);
@@ -206,7 +207,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 		sender.setParallelism(2);
 		sender.setInvokableClass(StoppableInvokable.class);
 
-		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender);
 		final JobID jid = jobGraph.getJobID();
 
 		cluster.submitJobDetached(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index fddf639..2265b3b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -93,12 +94,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
       val sender = new JobVertex("BlockingSender")
       sender.setParallelism(num_slots)
       sender.setInvokableClass(classOf[BlockingNoOpInvokable])
-      val jobGraph = new JobGraph("Blocking Testjob", sender)
+      val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), sender)
 
       val noOp = new JobVertex("NoOpInvokable")
       noOp.setParallelism(num_slots)
       noOp.setInvokableClass(classOf[NoOpInvokable])
-      val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
+      val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp)
 
       val cluster = startDeathwatchCluster(num_slots / 2, 2)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
index 09af430..9aa1e94 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 import akka.actor.ActorSystem
 import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
@@ -64,7 +65,7 @@ class JobManagerLeaderSessionIDITSuite(_system: ActorSystem)
     val sender = new JobVertex("BlockingSender");
     sender.setParallelism(numSlots)
     sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable])
-    val jobGraph = new JobGraph("TestJob", sender)
+    val jobGraph = new JobGraph("TestJob", new ExecutionConfig(), sender)
 
     val oldSessionID = UUID.randomUUID()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 869af82..88d760d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.scala.runtime.taskmanager
 
-import akka.actor.Status.{Failure, Success}
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.api.common.ExecutionConfig
 
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{RegisteredAtJobMan
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
 
 import org.junit.runner.RunWith
@@ -100,7 +99,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setParallelism(num_tasks)
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
@@ -152,7 +151,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setParallelism(num_tasks)
       receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobGraph = new JobGraph("Pointwise Job", new ExecutionConfig(), sender, receiver)
       val jobID = jobGraph.getJobID
 
       val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
@@ -191,12 +190,12 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val sender = new JobVertex("BlockingSender")
       sender.setParallelism(num_slots)
       sender.setInvokableClass(classOf[BlockingNoOpInvokable])
-      val jobGraph = new JobGraph("Blocking Testjob", sender)
+      val jobGraph = new JobGraph("Blocking Testjob", new ExecutionConfig(), sender)
 
       val noOp = new JobVertex("NoOpInvokable")
       noOp.setParallelism(num_slots)
       noOp.setInvokableClass(classOf[NoOpInvokable])
-      val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
+      val jobGraph2 = new JobGraph("NoOp Testjob", new ExecutionConfig(), noOp)
 
       val cluster = createDeathwatchCluster(num_slots/2, 2)
 


Mime
View raw message