flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/9] flink git commit: [FLINK-3363] [jobmanager] Properly shut down executor thread pool when JobManager shuts down
Date Mon, 08 Feb 2016 23:06:06 GMT
[FLINK-3363] [jobmanager] Properly shut down executor thread pool when JobManager shuts down


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a277543c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a277543c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a277543c

Branch: refs/heads/master
Commit: a277543c57f7c633e0f8b610b241eac5a95aa7cc
Parents: af3e689
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 8 13:18:50 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 8 16:57:57 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  42 +++++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |   5 +-
 .../JobManagerLeaderElectionTest.java           |  16 ++-
 .../testingUtils/TestingJobManager.scala        |  17 +---
 .../LocalFlinkMiniClusterITCase.java            | 101 ++++++++++++++++---
 .../flink/yarn/TestingYarnJobManager.scala      |   9 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  14 ++-
 7 files changed, 149 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bc7c134..d96575f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress}
 import java.util.UUID
+import java.util.concurrent.ExecutorService
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -90,7 +91,7 @@ import scala.language.postfixOps
  *  is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]]
  *
  * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an
-     ExecutionVertex contained in the [[ExecutionGraph]].
+  * ExecutionVertex contained in the [[ExecutionGraph]].
  * A successful update is acknowledged by true and otherwise false.
  *
  * - [[RequestNextInputSplit]] requests the next input split for a running task on a
@@ -102,7 +103,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executionContext: ExecutionContext,
+    protected val executorService: ExecutorService,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -121,6 +122,15 @@ class JobManager(
 
   override val log = Logger(getClass)
 
+  /** The extra execution context, for futures, with a custom logging reporter */
+  protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
+    executorService,
+    (t: Throwable) => {
+      if (!context.system.isTerminated) {
+        log.error("Executor could not execute task", t)
+      }
+    })
+  
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
@@ -246,6 +256,9 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the library cache
manager.", e)
     }
 
+    // shut down the extra thread pool for futures
+    executorService.shutdown()
+    
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -1503,7 +1516,8 @@ class JobManager(
   
   /**
    * Updates the accumulators reported from a task manager via the Heartbeat message.
-   * @param accumulators list of accumulator snapshots
+    *
+    * @param accumulators list of accumulator snapshots
    */
   private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
     accumulators foreach {
@@ -2016,7 +2030,7 @@ object JobManager {
   def createJobManagerComponents(
       configuration: Configuration,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
-    (ExecutionContext,
+    (ExecutorService,
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
@@ -2064,17 +2078,19 @@ object JobManager {
           }
       }
 
-    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+    
 
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
 
+    val executorService: ExecutorService = new ForkJoinPool()
+    
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(executionContext)
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2093,6 +2109,8 @@ object JobManager {
         if (blobServer != null) {
           blobServer.shutdown()
         }
+        executorService.shutdownNow()
+        
         throw t
     }
 
@@ -2122,7 +2140,7 @@ object JobManager {
             new ZooKeeperCheckpointRecoveryFactory(client, configuration))
       }
 
-    (executionContext,
+    (executorService,
       instanceManager,
       scheduler,
       libraryCacheManager,
@@ -2143,8 +2161,7 @@ object JobManager {
    * @param actorSystem The actor system running the JobManager
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-   *
-   * @return A tuple of references (JobManager Ref, Archiver Ref)
+    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
@@ -2174,8 +2191,7 @@ object JobManager {
    *                          the actor will have the name generated by the actor system.
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-   *
-   * @return A tuple of references (JobManager Ref, Archiver Ref)
+    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
@@ -2186,7 +2202,7 @@ object JobManager {
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
-    val (executionContext,
+    val (executorService: ExecutorService,
     instanceManager,
     scheduler,
     libraryCacheManager,
@@ -2211,7 +2227,7 @@ object JobManager {
     val jobManagerProps = Props(
       jobManagerClass,
       configuration,
-      executionContext,
+      executorService,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bdb62ba..4cdda3f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent._
+import scala.concurrent.forkjoin.ForkJoinPool
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -82,7 +83,7 @@ abstract class FlinkMiniCluster(
 
   /** Future lock */
   val futureLock = new Object()
-
+  
   implicit val executionContext = ExecutionContext.global
 
   implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
@@ -320,8 +321,6 @@ abstract class FlinkMiniCluster(
       _.map(gracefulStop(_, timeout))
     } getOrElse(Seq())
 
-    implicit val executionContext = ExecutionContext.global
-
     Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout)
 
     if (!useSingleActorSystem) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 47255fc..f50a0a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -52,7 +52,9 @@ import org.junit.rules.TemporaryFolder;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.forkjoin.ForkJoinPool;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class JobManagerLeaderElectionTest extends TestLogger {
@@ -62,14 +64,16 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 
 	private static ActorSystem actorSystem;
 	private static TestingServer testingServer;
+	private static ExecutorService executor;
+	
 	private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
 	private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);
 
 	@BeforeClass
 	public static void setup() throws Exception {
 		actorSystem = ActorSystem.create("TestingActorSystem");
-
 		testingServer = new TestingServer();
+		executor = new ForkJoinPool();
 	}
 
 	@AfterClass
@@ -78,9 +82,13 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			JavaTestKit.shutdownActorSystem(actorSystem);
 		}
 
-		if(testingServer != null) {
+		if (testingServer != null) {
 			testingServer.stop();
 		}
+		
+		if (executor != null) {
+			executor.shutdownNow();
+		}
 	}
 
 	/**
@@ -175,10 +183,10 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		return Props.create(
 				TestingJobManager.class,
 				configuration,
-				TestingUtils.defaultExecutionContext(),
+				executor,
 				new InstanceManager(),
 				new Scheduler(TestingUtils.defaultExecutionContext()),
-				new BlobLibraryCacheManager(new BlobServer(configuration), 10l),
+				new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
 				ActorRef.noSender(),
 				1,
 				1L,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 7cbff48..0c0ca40 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -27,25 +28,17 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import java.util.concurrent.ExecutorService
+
 /** JobManager implementation extended by testing messages
   *
-  * @param flinkConfiguration
-  * @param executionContext
-  * @param instanceManager
-  * @param scheduler
-  * @param libraryCacheManager
-  * @param archive
-  * @param defaultExecutionRetries
-  * @param delayBetweenRetries
-  * @param timeout
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executionContext: ExecutionContext,
+    executorService: ExecutorService,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -58,7 +51,7 @@ class TestingJobManager(
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
-    executionContext,
+      executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 8e87143..440cfff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,33 +29,42 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+
 import org.junit.Test;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.forkjoin.ForkJoinPool;
+import scala.concurrent.impl.ExecutionContextImpl;
 
-public class LocalFlinkMiniClusterITCase {
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
-	static ActorSystem system;
+import static org.junit.Assert.fail;
 
-	@BeforeClass
-	public static void setup() {
-		system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
-	}
+public class LocalFlinkMiniClusterITCase {
 
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
-	}
+	private static String[] ALLOWED_THREAD_PREFIXES = { };
 
 	@Test
 	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+		
+		final ActorSystem system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
 		LocalFlinkMiniCluster miniCluster = null;
 
 		final int numTMs = 3;
 		final int numSlots = 14;
 
-		try{
+		// gather the threads that already exist
+		final Set<Thread> threadsBefore = new HashSet<>();
+		{
+			final Thread[] allThreads = new Thread[Thread.activeCount()];
+			Thread.enumerate(allThreads);
+			threadsBefore.addAll(Arrays.asList(allThreads));
+		}
+		
+		
+		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
@@ -90,7 +100,70 @@ public class LocalFlinkMiniClusterITCase {
 		} finally {
 			if (miniCluster != null) {
 				miniCluster.stop();
+				miniCluster.awaitTermination();
 			}
+
+			JavaTestKit.shutdownActorSystem(system);
+			system.awaitTermination();
+		}
+
+		// shut down the global execution context, to make sure it does not affect this testing
+		try {
+			Field f = ExecutionContextImpl.class.getDeclaredField("executor");
+			f.setAccessible(true);
+			
+			Object exec = ExecutionContext$.MODULE$.global();
+			ForkJoinPool executor = (ForkJoinPool) f.get(exec);
+			executor.shutdownNow();
+		}
+		catch (Exception e) {
+			System.err.println("Cannot test proper thread shutdown for local execution.");
+			return;
+		}
+		
+		// check for remaining threads
+		// we need to check repeatedly for a while, because some threads shut down slowly
+		
+		long deadline = System.currentTimeMillis() + 30000;
+		boolean foundThreads = true;
+		String threadName = "";
+		
+		while (System.currentTimeMillis() < deadline) {
+			// check that no additional threads remain
+			final Thread[] threadsAfter = new Thread[Thread.activeCount()];
+			Thread.enumerate(threadsAfter);
+
+			foundThreads = false;
+			for (Thread t : threadsAfter) {
+				if (t.isAlive() && !threadsBefore.contains(t)) {
+					// this thread was not there before. check if it is allowed
+					boolean allowed = false;
+					for (String prefix : ALLOWED_THREAD_PREFIXES) {
+						if (t.getName().startsWith(prefix)) {
+							allowed = true;
+							break;
+						}
+					}
+					
+					if (!allowed) {
+						foundThreads = true;
+						threadName = t.toString();
+						break;
+					}
+				}
+			}
+			
+			if (foundThreads) {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException ignored) {}
+			} else {
+				break;
+			}
+		}
+		
+		if (foundThreads) {
+			fail("Thread " + threadName + " was started by the mini cluster, but not shut down");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index a7f21e5..2d50407 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn
 
+import java.util.concurrent.ExecutorService
+
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
@@ -28,7 +30,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.FiniteDuration
 
 /** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin.
@@ -37,7 +38,7 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger
name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executionContext Execution context which is used to execute concurrent tasks in
the
+  * @param executorService Execution context which is used to execute concurrent tasks in
the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -51,7 +52,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executionContext: ExecutionContext,
+    executorService: ExecutorService,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -64,7 +65,7 @@ class TestingYarnJobManager(
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends YarnJobManager(
     flinkConfiguration,
-    executionContext,
+    executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 708c1d8..8dfa22d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,10 +22,13 @@ import java.io.File
 import java.lang.reflect.Method
 import java.nio.ByteBuffer
 import java.util.Collections
+import java.util.concurrent.ExecutorService
 import java.util.{List => JavaList}
 
 import akka.actor.ActorRef
+
 import grizzled.slf4j.Logger
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -40,6 +43,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.yarn.YarnMessages._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
@@ -55,7 +59,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.YarnException
 import org.apache.hadoop.yarn.util.Records
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Try
@@ -64,7 +67,7 @@ import scala.util.Try
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executionContext Execution context which is used to execute concurrent tasks in
the
+  * @param executorService Execution context which is used to execute concurrent tasks in
the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -78,7 +81,7 @@ import scala.util.Try
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executionContext: ExecutionContext,
+    executorService: ExecutorService,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -91,7 +94,7 @@ class YarnJobManager(
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
-    executionContext,
+    executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,
@@ -587,7 +590,8 @@ class YarnJobManager(
 
   /**
    * Calculate the correct JVM heap memory limit.
-   * @param memoryLimit The maximum memory in megabytes.
+    *
+    * @param memoryLimit The maximum memory in megabytes.
    * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
    */
   private def calculateMemoryLimits(memoryLimit: Long): Long = {


Mime
View raw message