flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor
Date Thu, 02 Mar 2017 13:03:27 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 44f48b34e -> ba5aa10b9


[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor

Before the scheduler was set when calling ExecutionGraph.scheduleForExecution(). This
has the disadvantage that the ExecutionGraph has not scheduler set if something else
went wrong before the scheduleForExecution call. Consequently, the job will be stuck
in a restart loop because the recovery will fail if there is no Scheduler set. In
order to solve the problem, the Scheduler is not passed to the ExecutionGraph when
it is created.

This closes #3441.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba5aa10b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba5aa10b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba5aa10b

Branch: refs/heads/release-1.1
Commit: ba5aa10b9bd20f6137134fe8ef8c882ce9c40a7c
Parents: 44f48b3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 28 15:20:47 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Mar 2 14:02:59 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 19 ++---
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java | 24 +++---
 .../ExecutionGraphConstructionTest.java         | 25 ++++--
 .../ExecutionGraphDeploymentTest.java           | 44 +++++-----
 .../ExecutionGraphMetricsTest.java              |  3 +-
 .../ExecutionGraphRestartTest.java              | 39 +++++----
 .../ExecutionGraphSignalsTest.java              |  4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  5 +-
 .../ExecutionStateProgressTest.java             |  4 +-
 .../executiongraph/LocalInputSplitsTest.java    | 41 ++++-----
 .../executiongraph/PointwisePatternTest.java    | 22 +++--
 .../TerminalStateDeadlockTest.java              |  5 +-
 .../VertexLocationConstraintTest.java           | 88 +++++++++++---------
 .../executiongraph/VertexSlotSharingTest.java   |  4 +-
 .../TaskManagerLossFailsTasksTest.scala         |  5 +-
 .../partitioner/RescalePartitionerTest.java     |  2 +
 17 files changed, 195 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index bfd93e4..dde9ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -221,6 +221,7 @@ public class ExecutionGraph implements Serializable {
 
 	/** Checkpoint stats tracker seperate from the coordinator in order to be
 	 * available after archiving. */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	/** The execution context which is used to execute futures. */
@@ -252,7 +253,8 @@ public class ExecutionGraph implements Serializable {
 			Configuration jobConfig,
 			SerializedValue<ExecutionConfig> serializedConfig,
 			FiniteDuration timeout,
-			RestartStrategy restartStrategy) throws IOException {
+			RestartStrategy restartStrategy,
+			Scheduler scheduler) throws IOException {
 		this(
 			futureExecutor,
 			ioExecutor,
@@ -264,6 +266,7 @@ public class ExecutionGraph implements Serializable {
 			restartStrategy,
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(),
+			scheduler,
 			ExecutionGraph.class.getClassLoader(),
 			new UnregisteredMetricsGroup()
 		);
@@ -280,13 +283,13 @@ public class ExecutionGraph implements Serializable {
 			RestartStrategy restartStrategy,
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
+			Scheduler scheduler,
 			ClassLoader userClassLoader,
 			MetricGroup metricGroup) throws IOException {
 
 		checkNotNull(jobId);
 		checkNotNull(jobName);
 		checkNotNull(jobConfig);
-		checkNotNull(userClassLoader);
 
 		this.jobInformation = new JobInformation(
 			jobId,
@@ -303,7 +306,8 @@ public class ExecutionGraph implements Serializable {
 		this.futureExecutionContext = ExecutionContext$.MODULE$.fromExecutor(futureExecutor);
 		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
-		this.userClassLoader = userClassLoader;
+		this.scheduler = Preconditions.checkNotNull(scheduler, "scheduler");
+		this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader");
 
 		this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
 		this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
@@ -749,17 +753,12 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	public void scheduleForExecution(Scheduler scheduler) throws JobException {
+	public void scheduleForExecution() throws JobException {
 		if (scheduler == null) {
 			throw new IllegalArgumentException("Scheduler must not be null.");
 		}
 
-		if (this.scheduler != null && this.scheduler != scheduler) {
-			throw new IllegalArgumentException("Cannot use different schedulers for the same job");
-		}
-
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-			this.scheduler = scheduler;
 
 			switch (scheduleMode) {
 
@@ -976,7 +975,7 @@ public class ExecutionGraph implements Serializable {
 				}
 			}
 
-			scheduleForExecution(scheduler);
+			scheduleForExecution();
 		}
 		catch (Throwable t) {
 			LOG.warn("Failed to restart the job.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1720d94..c0ede0d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1156,6 +1156,7 @@ class JobManager(
               restartStrategy,
               jobGraph.getUserJarBlobKeys,
               jobGraph.getClasspaths,
+              scheduler,
               userCodeLoader,
               jobMetrics)
 
@@ -1366,7 +1367,7 @@ class JobManager(
             // the job.
             log.info(s"Scheduling job $jobId ($jobName).")
 
-            executionGraph.scheduleForExecution(scheduler)
+            executionGraph.scheduleForExecution()
           } else {
             // Remove the job graph. Otherwise it will be lingering around and possibly removed
from
             // ZooKeeper by this JM.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index c23aaf8..cbbe80e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
@@ -125,18 +126,19 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			CheckpointIDCounter counter,
 			CompletedCheckpointStore store) throws Exception {
 		ExecutionGraph executionGraph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
 			TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"test",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				new FiniteDuration(1, TimeUnit.DAYS),
-				new NoRestartStrategy(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				ClassLoader.getSystemClassLoader(),
-				new UnregisteredMetricsGroup());
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"test",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			new FiniteDuration(1, TimeUnit.DAYS),
+			new NoRestartStrategy(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
+			ClassLoader.getSystemClassLoader(),
+			new UnregisteredMetricsGroup());
 
 		executionGraph.enableSnapshotCheckpointing(
 				100,

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index bf3a17c..ea25dca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 			fail("Attached wrong jobgraph");
@@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest {
 				cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			try {
 				eg.attachJobGraph(ordered);
 			}
@@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest {
 				cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 
 			try {
 				eg.attachJobGraph(ordered);
@@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest {
 				cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			
 			eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cc0ffba..5dd06b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -100,7 +99,8 @@ public class ExecutionGraphDeploymentTest {
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 
 			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -315,6 +315,14 @@ public class ExecutionGraphDeploymentTest {
 
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
 
+		Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
+		for (int i = 0; i < dop1; i++) {
+			scheduler.newInstanceAvailable(
+				ExecutionGraphTestUtils.getInstance(
+					new ExecutionGraphTestUtils.SimpleActorGateway(
+						TestingUtils.directExecutionContext())));
+		}
+
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.directExecutionContext(),
@@ -324,24 +332,18 @@ public class ExecutionGraphDeploymentTest {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			scheduler);
 
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-		for (int i = 0; i < dop1; i++) {
-			scheduler.newInstanceAvailable(
-				ExecutionGraphTestUtils.getInstance(
-					new ExecutionGraphTestUtils.SimpleActorGateway(
-						TestingUtils.directExecutionContext())));
-		}
 		assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
 
 		// schedule, this triggers mock deployment
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 
 		ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
 		eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING));
@@ -359,6 +361,14 @@ public class ExecutionGraphDeploymentTest {
 		v1.setInvokableClass(BatchTask.class);
 		v2.setInvokableClass(BatchTask.class);
 
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		for (int i = 0; i < dop1 + dop2; i++) {
+			scheduler.newInstanceAvailable(
+				ExecutionGraphTestUtils.getInstance(
+					new ExecutionGraphTestUtils.SimpleActorGateway(
+						TestingUtils.directExecutionContext())));
+		}
+
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.directExecutionContext(),
@@ -368,24 +378,18 @@ public class ExecutionGraphDeploymentTest {
 			new Configuration(), 
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			scheduler);
 		
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		for (int i = 0; i < dop1 + dop2; i++) {
-			scheduler.newInstanceAvailable(
-					ExecutionGraphTestUtils.getInstance(
-							new ExecutionGraphTestUtils.SimpleActorGateway(
-									TestingUtils.directExecutionContext())));
-		}
 		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
 
 		// schedule, this triggers mock deployment
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 
 		Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
 		assertEquals(dop1 + dop2, executions.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 3e6aba5..29e1c14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -144,6 +144,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			testingRestartStrategy,
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
+			scheduler,
 			getClass().getClassLoader(),
 			metricGroup);
 
@@ -161,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		// start execution
-		executionGraph.scheduleForExecution(scheduler);
+		executionGraph.scheduleForExecution();
 
 		assertTrue(0L == restartingTime.getValue());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 659a912..db41eb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -112,12 +112,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		
 		//initiate and schedule job
 		JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2);
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L), scheduler);
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 		
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 		
 		//sanity checks
@@ -235,7 +235,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			// We want to manually control the restart and delay
-			new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
+			new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE),
+			scheduler);
 
 		JobVertex jobVertex = new JobVertex("NoOpInvokable");
 		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
@@ -247,7 +248,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.CREATED, executionGraph.getState());
 
-		executionGraph.scheduleForExecution(scheduler);
+		executionGraph.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, executionGraph.getState());
 
@@ -378,12 +379,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class);
 		JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class);
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000), scheduler);
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
@@ -446,13 +447,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Test Job", vertex);
 		jobGraph.setExecutionConfig(executionConfig);
 
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000), scheduler);
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		// Fail right after cancel (for example with concurrent slot release)
@@ -491,13 +492,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Test Job", vertex);
 		jobGraph.setExecutionConfig(executionConfig);
 
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000), scheduler);
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		// Fail right after cancel (for example with concurrent slot release)
@@ -546,13 +547,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			controllableRestartStrategy);
+			controllableRestartStrategy,
+			scheduler);
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -650,7 +652,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
-		ExecutionGraph eg = newExecutionGraph(restartStrategy);
+		ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler);
 		if (isSpy) {
 			eg = spy(eg);
 		}
@@ -658,7 +660,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 		return new Tuple2<>(eg, instance);
 	}
@@ -670,7 +672,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		return groupVertex;
 	}
 
-	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws
IOException {
+	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler
scheduler) throws IOException {
 		return new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
 			TestingUtils.defaultExecutionContext(),
@@ -679,7 +681,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			restartStrategy);
+			restartStrategy,
+			scheduler);
+	}
+
+	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws
IOException {
+		return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext()));
 	}
 
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean
haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index fde967e..fab6505 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		eg.attachJobGraph(ordered);
 
 		f = eg.getClass().getDeclaredField("state");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 89d8a9b..cf0136d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
@@ -56,6 +57,7 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 public class ExecutionGraphTestUtils {
 
@@ -180,7 +182,8 @@ public class ExecutionGraphTestUtils {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
 
 		ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
 				AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 955c9db..64e1e12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
@@ -58,7 +59,8 @@ public class ExecutionStateProgressTest {
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			graph.attachJobGraph(Collections.singletonList(ajv));
 
 			setGraphStatus(graph, JobStatus.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 11dad92..b5e04cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -268,20 +268,7 @@ public class LocalInputSplitsTest {
 			vertex.setInputSplitSource(new TestInputSplitSource(splits));
 			
 			JobGraph jobGraph = new JobGraph("test job", vertex);
-			
-			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				TestingUtils.defaultExecutionContext(),
-				jobGraph.getJobID(),
-				jobGraph.getName(),  
-				jobGraph.getJobConfiguration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				TIMEOUT,
-				new NoRestartStrategy());
-			
-			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-			eg.setQueuedSchedulingAllowed(false);
-			
+
 			// create a scheduler with 6 instances where always two are on the same host
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 			Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1);
@@ -297,7 +284,21 @@ public class LocalInputSplitsTest {
 			scheduler.newInstanceAvailable(i5);
 			scheduler.newInstanceAvailable(i6);
 			
-			eg.scheduleForExecution(scheduler);
+			ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jobGraph.getJobID(),
+				jobGraph.getName(),  
+				jobGraph.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				TIMEOUT,
+				new NoRestartStrategy(),
+				scheduler);
+			
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			eg.setQueuedSchedulingAllowed(false);
+			
+			eg.scheduleForExecution();
 			
 			ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
 			assertEquals(6, tasks.length);
@@ -334,6 +335,8 @@ public class LocalInputSplitsTest {
 		vertex.setInputSplitSource(new TestInputSplitSource(splits));
 		
 		JobGraph jobGraph = new JobGraph("test job", vertex);
+
+		Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
 		
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
@@ -343,14 +346,14 @@ public class LocalInputSplitsTest {
 			jobGraph.getJobConfiguration(),
 				new SerializedValue<>(new ExecutionConfig()),
 			TIMEOUT,
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			scheduler);
 		
 		eg.setQueuedSchedulingAllowed(false);
 		
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-		
-		Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
-		eg.scheduleForExecution(scheduler);
+
+		eg.scheduleForExecution();
 		
 		ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
 		assertEquals(parallelism, tasks.length);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0e147e3..b59dcda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -73,7 +74,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -119,7 +121,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -166,7 +169,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -214,7 +218,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -260,7 +265,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -326,7 +332,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -383,7 +390,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 504822b..11e382f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -190,11 +190,12 @@ public class TerminalStateDeadlockTest {
 				EMPTY_CONFIG,
 				new SerializedValue<>(new ExecutionConfig()),
 				TIMEOUT,
-				new FixedDelayRestartStrategy(1, 0));
+				new FixedDelayRestartStrategy(1, 0),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 		}
 
 		@Override
-		public void scheduleForExecution(Scheduler scheduler) {
+		public void scheduleForExecution() {
 			// notify that we are done with the "restarting"
 			synchronized (this) {
 				done = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index b160561..67e3ede 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -88,7 +88,8 @@ public class VertexLocationConstraintTest {
 				jg.getJobConfiguration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				timeout,
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -155,14 +156,15 @@ public class VertexLocationConstraintTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -233,14 +235,15 @@ public class VertexLocationConstraintTest {
 			JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
@@ -302,14 +305,15 @@ public class VertexLocationConstraintTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -373,14 +377,15 @@ public class VertexLocationConstraintTest {
 			jobVertex2.setSlotSharingGroup(sharingGroup);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
@@ -415,14 +420,15 @@ public class VertexLocationConstraintTest {
 			JobGraph jg = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(Collections.singletonList(vertex));
 			
 			ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 27708a2..5c85592 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -87,7 +88,8 @@ public class VertexSlotSharingTest {
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(vertices);
 			
 			// verify that the vertices are all in the same slot sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 7c77ce7..7470888 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -61,13 +61,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers
{
           new Configuration(),
           new SerializedValue(new ExecutionConfig()),
           AkkaUtils.getDefaultTimeout,
-          new NoRestartStrategy())
+          new NoRestartStrategy(),
+          scheduler)
 
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 
         eg.getState should equal(JobStatus.CREATED)
 
-        eg.scheduleForExecution(scheduler)
+        eg.scheduleForExecution()
         eg.getState should equal(JobStatus.RUNNING)
 
         instance1.markDead()

http://git-wip-us.apache.org/repos/asf/flink/blob/ba5aa10b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 03e3535..9660c9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -144,6 +145,7 @@ public class RescalePartitionerTest extends TestLogger {
 			new NoRestartStrategy(),
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ExecutionGraph.class.getClassLoader(),
 			new UnregisteredMetricsGroup());
 		try {


Mime
View raw message