flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/4] flink git commit: [FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager
Date Tue, 22 Nov 2016 22:09:41 GMT
[FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager

The provided ExecutorService will no longer be closed by the JobManager. Instead the
lifecycle is managed outside of it where it was created. This will give a nicer behaviour,
because it better seperates responsibilities.

This closes #2820.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae4b274a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae4b274a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae4b274a

Branch: refs/heads/master
Commit: ae4b274a9919d01a236df4e819a0a07c5d8543ac
Parents: 698e53e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Nov 16 18:33:54 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Nov 22 23:00:16 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           | 18 ++++-
 .../clusterframework/MesosJobManager.scala      |  8 +--
 .../BackPressureStatsTrackerITCase.java         |  2 +-
 .../StackTraceSampleCoordinatorITCase.java      |  2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../flink/runtime/util/NamedThreadFactory.java  | 58 ++++++++++++++++
 .../ContaineredJobManager.scala                 |  8 +--
 .../flink/runtime/jobmanager/JobManager.scala   | 73 ++++++++++++--------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  9 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 10 +--
 .../runtime/jobmanager/JobManagerTest.java      | 48 +++++++------
 .../flink/runtime/jobmanager/JobSubmitTest.java |  9 +--
 .../resourcemanager/ClusterShutdownITCase.java  |  4 +-
 .../resourcemanager/ResourceManagerITCase.java  |  4 +-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../testingUtils/TestingJobManager.scala        |  6 +-
 .../runtime/testingUtils/TestingUtils.scala     | 60 ++++------------
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/TestingYarnJobManager.scala      |  9 ++-
 .../flink/yarn/YarnApplicationMasterRunner.java | 19 ++++-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  8 +--
 25 files changed, 229 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5ec39c2..09f87bd 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -45,8 +45,10 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
@@ -66,6 +68,8 @@ import java.net.URL;
 import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.mesos.Utils.uri;
@@ -75,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is the executable entry point for the Mesos Application Master.
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * It starts actor system and the actors for {@link JobManager}
  * and {@link MesosFlinkResourceManager}.
  *
  * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
@@ -168,6 +172,12 @@ public class MesosApplicationMasterRunner {
 		WebMonitor webMonitor = null;
 		MesosArtifactServer artifactServer = null;
 
+		int numberProcessors = Hardware.getNumberCPUCores();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -281,7 +291,9 @@ public class MesosApplicationMasterRunner {
 
 			// we start the JobManager with its standard name
 			ActorRef jobManager = JobManager.startJobManagerActors(
-				config, actorSystem,
+				config,
+				actorSystem,
+				executor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -387,6 +399,8 @@ public class MesosApplicationMasterRunner {
 			LOG.error("Failed to stop the artifact server", t);
 		}
 
+		executor.shutdownNow();
+
 		return 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 113ab85..300539c 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.mesos.runtime.clusterframework
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
@@ -37,7 +37,7 @@ import scala.concurrent.duration._
 /** JobManager actor for execution on Mesos. .
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -49,7 +49,7 @@ import scala.concurrent.duration._
   * @param leaderElectionService LeaderElectionService to participate in the leader election
   */
 class MesosJobManager(flinkConfiguration: FlinkConfiguration,
-                      executorService: ExecutorService,
+                      executor: Executor,
                       instanceManager: InstanceManager,
                       scheduler: FlinkScheduler,
                       libraryCacheManager: BlobLibraryCacheManager,
@@ -63,7 +63,7 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration,
                       metricsRegistry: Option[FlinkMetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 9d099e3..d01e0cf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			}
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
 
 				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index e012d0b..4b5bd2f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			ActorGateway taskManager = null;
 
 			try {
-				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+				jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());
 
 				final Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 1ae776c..fcdf94d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -179,6 +179,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				jobManager[i] = JobManager.startJobManagerActors(
 					jmConfig,
 					jobManagerSystem[i],
+					jobManagerSystem[i].dispatcher(),
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
new file mode 100644
index 0000000..bd97963
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Thread factory which allows to specify a thread pool name and a thread name.
+ *
+ * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+	private static final AtomicInteger poolNumber = new AtomicInteger(1);
+	private final ThreadGroup group;
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+	private final String namePrefix;
+
+	public NamedThreadFactory(final String poolName, final String threadName) {
+		SecurityManager securityManager = System.getSecurityManager();
+		group = (securityManager != null) ? securityManager.getThreadGroup() :
+			Thread.currentThread().getThreadGroup();
+
+		namePrefix = poolName +
+			poolNumber.getAndIncrement() +
+			threadName;
+	}
+
+	@Override
+	public Thread newThread(Runnable runnable) {
+		Thread t = new Thread(group, runnable,
+			namePrefix + threadNumber.getAndIncrement(),
+			0);
+		if (t.isDaemon()) {
+			t.setDaemon(false);
+		}
+		if (t.getPriority() != Thread.NORM_PRIORITY) {
+			t.setPriority(Thread.NORM_PRIORITY);
+		}
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 72df671..0f31eba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
@@ -45,7 +45,7 @@ import scala.language.postfixOps
   * to start/administer/stop the session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -58,7 +58,7 @@ import scala.language.postfixOps
   */
 abstract class ContaineredJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -72,7 +72,7 @@ abstract class ContaineredJobManager(
     metricsRegistry: Option[FlinkMetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b2e1002..df80d72 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, TimeoutException}
+import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -118,7 +118,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executorService: ExecutorService,
+    protected val executor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -272,9 +272,6 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
     }
 
-    // shut down the extra thread pool for futures
-    executorService.shutdown()
-
     // failsafe shutdown of the metrics registry
     try {
       metricsRegistry.foreach(_.shutdown())
@@ -1250,7 +1247,7 @@ class JobManager(
           executionGraph,
           jobGraph,
           flinkConfiguration,
-          executorService,
+          executor,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1971,15 +1968,29 @@ object JobManager {
       listeningPort: Int)
     : Unit = {
 
-    val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors(
-      configuration,
-      executionMode,
-      listeningAddress,
-      listeningPort,
-      classOf[JobManager],
-      classOf[MemoryArchivist],
-      Option(classOf[StandaloneResourceManager])
-    )
+    val numberProcessors = Hardware.getNumberCPUCores()
+
+    val executor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+
+    val (jobManagerSystem, _, _, webMonitorOption, _) = try {
+      startActorSystemAndJobManagerActors(
+        configuration,
+        executionMode,
+        listeningAddress,
+        listeningPort,
+        executor,
+        classOf[JobManager],
+        classOf[MemoryArchivist],
+        Option(classOf[StandaloneResourceManager])
+      )
+    } catch {
+      case t: Throwable =>
+          executor.shutdownNow()
+
+        throw t
+    }
 
     // block until everything is shut down
     jobManagerSystem.awaitTermination()
@@ -1993,6 +2004,8 @@ object JobManager {
             LOG.warn("Could not properly stop the web monitor.", t)
         }
     }
+
+    executor.shutdownNow()
   }
 
   /**
@@ -2100,6 +2113,7 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen for messages.
     * @param listeningPort The port where the JobManager should listen for messages
+    * @param executor to run the JobManager's futures
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one should be started
@@ -2111,6 +2125,7 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2179,6 +2194,7 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
+        executor,
         jobManagerClass,
         archiveClass)
 
@@ -2379,15 +2395,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.
+   * @param executor to run JobManager's futures
    * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
+      executor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
-    (ExecutorService,
-    InstanceManager,
+    (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
     RestartStrategyFactory,
@@ -2416,13 +2433,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-
-    val executorService: ExecutorService = new ForkJoinPool()
     
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2441,7 +2456,6 @@ object JobManager {
         if (blobServer != null) {
           blobServer.shutdown()
         }
-        executorService.shutdownNow()
         
         throw t
     }
@@ -2494,8 +2508,7 @@ object JobManager {
         None
     }
 
-    (executorService,
-      instanceManager,
+    (instanceManager,
       scheduler,
       libraryCacheManager,
       restartStrategy,
@@ -2521,6 +2534,7 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2528,6 +2542,7 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
+      executor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2540,6 +2555,7 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param executor to run JobManager's futures
    * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
@@ -2551,14 +2567,14 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
-    val (executorService: ExecutorService,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategy,
@@ -2570,6 +2586,7 @@ object JobManager {
     jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
+      executor,
       None)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount)
@@ -2583,7 +2600,7 @@ object JobManager {
     val jobManagerProps = getJobManagerProps(
       jobManagerClass,
       configuration,
-      executorService,
+      executor,
       instanceManager,
       scheduler,
       libraryCacheManager,
@@ -2617,7 +2634,7 @@ object JobManager {
   def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -2633,7 +2650,7 @@ object JobManager {
     Props(
       jobManagerClass,
       configuration,
-      executorService,
+      executor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 048b013..d9a208d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.{Executors, ForkJoinPool}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
@@ -34,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.ZooKeeperUtils
+import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
@@ -104,6 +105,10 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
+  val executor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(
       ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
@@ -399,6 +404,8 @@ abstract class FlinkMiniCluster(
 
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
+
+    executor.shutdownNow
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 43ccda9..59dd399 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -114,8 +114,7 @@ class LocalFlinkMiniCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 
-    val (executorService,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategyFactory,
@@ -125,7 +124,10 @@ class LocalFlinkMiniCluster(
     submittedJobGraphStore,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService())
+    metricsRegistry) = JobManager.createJobManagerComponents(
+      config,
+      executor,
+      createLeaderElectionService())
 
     val archive = system.actorOf(
       getArchiveProps(
@@ -137,7 +139,7 @@ class LocalFlinkMiniCluster(
       getJobManagerProps(
         jobManagerClass,
         config,
-        executorService,
+        executor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index ff604f1..aeb1ae1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -397,10 +397,11 @@ public class JobManagerTest {
 		UUID leaderSessionId = null;
 		ActorGateway jobManager = new AkkaActorGateway(
 				JobManager.startJobManagerActors(
-						config,
-						system,
-						TestingJobManager.class,
-						MemoryArchivist.class)._1(),
+					config,
+					system,
+					system.dispatcher(),
+					TestingJobManager.class,
+					MemoryArchivist.class)._1(),
 				leaderSessionId);
 
 		LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(
@@ -603,12 +604,13 @@ public class JobManagerTest {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					config,
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				config,
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);
@@ -729,12 +731,13 @@ public class JobManagerTest {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					config,
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				config,
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);
@@ -825,12 +828,13 @@ public class JobManagerTest {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
 			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-					new Configuration(),
-					actorSystem,
-					Option.apply("jm"),
-					Option.apply("arch"),
-					TestingJobManager.class,
-					TestingMemoryArchivist.class);
+				new Configuration(),
+				actorSystem,
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
 
 			jobManager = new AkkaActorGateway(master._1(), null);
 			archiver = new AkkaActorGateway(master._2(), null);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 9aeea3d..4f26e68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -80,10 +80,11 @@ public class JobSubmitTest {
 
 		// only start JobManager (no ResourceManager)
 		JobManager.startJobManagerActors(
-				jmConfig,
-				jobManagerSystem,
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			jmConfig,
+			jobManagerSystem,
+			jobManagerSystem.dispatcher(),
+			JobManager.class,
+			MemoryArchivist.class)._1();
 
 		try {
 			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index e0763f3..3c8ea75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager1");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +114,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
 			// start job manager which doesn't shutdown the actor system
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "jobmanager2");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2");
 
 			// Tell the JobManager to inform us of shutdown actions
 			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index c83ce58..5a98c8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -75,7 +75,7 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "ReconciliationTest");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 
@@ -129,7 +129,7 @@ public class ResourceManagerITCase extends TestLogger {
 		protected void run() {
 
 			ActorGateway jobManager =
-				TestingUtils.createJobManager(system, config, "RegTest");
+				TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest");
 			ActorGateway me =
 				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index f9434e2..83eaddb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -85,6 +85,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
 				actorSystem,
+				actorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index c7913f7..63c1b29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -102,6 +102,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				new Configuration(),
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 53fa7c1..b21eba0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -106,7 +106,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 			try {
 				// a simple JobManager
-				jobManager = createJobManager(actorSystem, config);
+				jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config);
 				startResourceManager(config, jobManager.actor());
 
 				// start two TaskManagers. it will automatically try to register
@@ -187,8 +187,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				// now start the JobManager, with the regular akka URL
 				jobManager = createJobManager(
-						actorSystem,
-						new Configuration());
+					actorSystem,
+					actorSystem.dispatcher(),
+					new Configuration());
 
 				startResourceManager(config, jobManager.actor());
 
@@ -629,6 +630,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 		return JobManager.startJobManagerActors(
 			configuration,
 			actorSystem,
+			actorSystem.dispatcher(),
 			NONE_STRING,
 			NONE_STRING,
 			JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index f9c9b63..4485b65 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -172,6 +172,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e9bdb99..c6fd923 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.{Executor, ExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -39,7 +39,7 @@ import scala.language.postfixOps
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -53,7 +53,7 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 73fb928..b57a9dc 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -303,15 +303,18 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -322,85 +325,43 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       prefix
     )
   }
 
-  def createJobManager(
-      actorSystem: ActorSystem,
-      configuration: Configuration,
-      executionContext: ExecutionContext)
-    : ActorGateway = {
-
-    val (_,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    restartStrategy,
-    timeout,
-    archiveCount,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(
-      configuration,
-      None
-    )
-
-    val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount)
-
-    val archive: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
-
-    val jobManagerProps = Props(
-      classOf[TestingJobManager],
-      configuration,
-      executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      archive,
-      restartStrategy,
-      timeout,
-      leaderElectionService,
-      submittedJobGraphs,
-      checkpointRecoveryFactory,
-      jobRecoveryTimeout,
-      metricsRegistry)
-
-    val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
-
-    new AkkaActorGateway(jobManager, null)
-  }
-
   /**
     * Creates a JobManager of the given class using the default recovery mode (standalone)
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -408,6 +369,7 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -415,6 +377,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -427,6 +390,7 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
+        executor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index b6eb7ba..af86983 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -129,6 +129,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b66fb5d..f72ef34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -104,6 +104,7 @@ public class ProcessFailureCancelingITCase {
 			ActorRef jmActor = JobManager.startJobManagerActors(
 				jmConfig,
 				jmActorSystem,
+				jmActorSystem.dispatcher(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index aef2604..aabc19d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,12 +18,11 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
@@ -41,7 +40,7 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -54,7 +53,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -68,7 +67,7 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 8e3418c..002e162 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -37,8 +37,10 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
@@ -67,13 +69,15 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 
 /**
  * This class is the executable entry point for the YARN application master.
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * It starts actor system and the actors for {@link JobManager}
  * and {@link YarnFlinkResourceManager}.
  * 
  * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
@@ -209,6 +213,12 @@ public class YarnApplicationMasterRunner {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
 
+		int numberProcessors = Hardware.getNumberCPUCores();
+
+		final ExecutorService executor = Executors.newFixedThreadPool(
+			numberProcessors,
+			new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
@@ -321,7 +331,9 @@ public class YarnApplicationMasterRunner {
 
 			// we start the JobManager with its standard name
 			ActorRef jobManager = JobManager.startJobManagerActors(
-				config, actorSystem,
+				config,
+				actorSystem,
+				executor,
 				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
 				scala.Option.<String>empty(),
 				getJobManagerClass(),
@@ -414,6 +426,9 @@ public class YarnApplicationMasterRunner {
 				LOG.error("Failed to stop the web frontend", t);
 			}
 		}
+
+		executor.shutdownNow();
+
 		return 0;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 2df78c2..a81e6cf 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{ExecutorService, TimeUnit}
+import java.util.concurrent.{Executor, TimeUnit}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
@@ -40,7 +40,7 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -53,7 +53,7 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -67,7 +67,7 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,


Mime
View raw message