Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DAFD817694 for ; Tue, 24 Feb 2015 21:09:58 +0000 (UTC) Received: (qmail 70917 invoked by uid 500); 24 Feb 2015 21:09:58 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 70887 invoked by uid 500); 24 Feb 2015 21:09:58 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 70861 invoked by uid 99); 24 Feb 2015 21:09:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Feb 2015 21:09:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 833FBE03F7; Tue, 24 Feb 2015 21:09:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Tue, 24 Feb 2015 21:09:59 -0000 Message-Id: <0448b83a03d24b2d98e4bbffaa3da0ba@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization - Better checks during TaskManager startup - More robust initialization of TaskManager actor system and actor - Fix memory accounting during TaskManager startup - Better logging for TaskManagers started through YARN - Remove command line parameter hacking fro YARN TaskManagers Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed8b26bf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed8b26bf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed8b26bf Branch: refs/heads/master Commit: ed8b26bf2e8dd7c187c24ad0d8ff3e67f6a7478c Parents: 4883af6 Author: Stephan Ewen Authored: Tue Feb 24 20:23:47 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 24 20:30:29 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/runtime/net/NetUtils.java | 2 +- .../runtime/util/EnvironmentInformation.java | 98 ++- .../apache/flink/runtime/util/MathUtils.java | 10 + .../flink/runtime/jobmanager/JobManager.scala | 50 +- .../runtime/minicluster/FlinkMiniCluster.scala | 57 +- .../minicluster/LocalFlinkMiniCluster.scala | 56 +- .../flink/runtime/taskmanager/TaskManager.scala | 644 +++++++++++++------ .../TaskManagerCLIConfiguration.scala | 3 +- .../taskmanager/TaskManagerConfiguration.scala | 7 +- .../taskmanager/TaskManagerProfiler.scala | 4 +- .../TaskManagerProcessReapingTest.java | 6 +- .../runtime/taskmanager/TaskManagerTest.java | 5 +- .../apache/flink/runtime/util/MathUtilTest.java | 34 +- .../TaskManagerRegistrationITCase.scala | 10 +- .../runtime/testingUtils/TestingCluster.scala | 18 +- .../testingUtils/TestingTaskManager.scala | 25 +- .../runtime/testingUtils/TestingUtils.scala | 38 +- .../test/util/ForkableFlinkMiniCluster.scala | 32 +- .../yarn/appMaster/YarnTaskManagerRunner.java | 63 +- .../org/apache/flink/yarn/YarnTaskManager.scala | 18 +- .../scala/org/apache/flink/yarn/YarnUtils.scala | 45 -- 21 files changed, 723 insertions(+), 502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java index 73504e9..8e0a41a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java @@ -107,7 +107,7 @@ public class NetUtils { break; default: - throw new RuntimeException("Unkown address detection strategy: " + strategy); + throw new RuntimeException("Unknown address detection strategy: " + strategy); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java index 535c756..d2147e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java @@ -36,12 +36,6 @@ public class EnvironmentInformation { private static final Logger LOG = LoggerFactory.getLogger(EnvironmentInformation.class); private static final String UNKNOWN = ""; - - private static final String[] IGNORED_STARTUP_OPTIONS = { - "-Dlog.file", - "-Dlogback.configurationFile", - "-Dlog4j.configuration" - }; /** * Returns the version of the code as String. If version == null, then the JobManager does not run from a @@ -55,7 +49,7 @@ public class EnvironmentInformation { } /** - * Returns the code revision (commit and commit date) of Flink. + * Returns the code revision (commit and commit date) of Flink, as generated by the Maven builds. * * @return The code revision. */ @@ -83,11 +77,6 @@ public class EnvironmentInformation { return info; } - public static class RevisionInformation { - public String commitId; - public String commitDate; - } - /** * Gets the name of the user that is running the JVM. * @@ -149,8 +138,12 @@ public class EnvironmentInformation { Runtime r = Runtime.getRuntime(); return r.maxMemory() - r.totalMemory() + r.freeMemory(); } - - + + /** + * Gets the version of the JVM in the form "VM_Name - Vendor - Spec/Version". + * + * @return The JVM version. + */ public static String getJvmVersion() { try { final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); @@ -161,24 +154,18 @@ public class EnvironmentInformation { } } + /** + * Gets the system parameters and environment parameters that were passed to the JVM on startup. + * + * @return The options passed to the JVM on startup. + */ public static String getJvmStartupOptions() { try { final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); final StringBuilder bld = new StringBuilder(); for (String s : bean.getInputArguments()) { - - boolean append = true; - for (String ignored : IGNORED_STARTUP_OPTIONS) { - if (s.startsWith(ignored)) { - append = false; - break; - } - } - - if (append) { - bld.append(s).append(' '); - } + bld.append(s).append(' '); } return bld.toString(); @@ -187,11 +174,23 @@ public class EnvironmentInformation { return UNKNOWN; } } - + + /** + * Gets the directory for temporary files, as returned by the JVM system property "java.io.tmpdir". + * + * @return The directory for temporary files. + */ public static String getTemporaryFileDirectory() { return System.getProperty("java.io.tmpdir"); } + /** + * Logs a information about the environment, like code revision, current user, java version, + * and JVM parameters. + * + * @param log The logger to log the information to. + * @param componentName The component name to mention in the log. + */ public static void logEnvironmentInfo(Logger log, String componentName) { if (log.isInfoEnabled()) { RevisionInformation rev = getRevisionInformation(); @@ -206,7 +205,7 @@ public class EnvironmentInformation { long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20; - log.info("-------------------------------------------------------"); + log.info("--------------------------------------------------------------------------------"); log.info(" Starting " + componentName + " (Version: " + version + ", " + "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")"); log.info(" Current user: " + user); @@ -214,15 +213,46 @@ public class EnvironmentInformation { log.info(" Startup Options: " + options); log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes"); log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome)); - log.info("-------------------------------------------------------"); + log.info("--------------------------------------------------------------------------------"); + } + } + + /** + * Checks whether the Java version is lower than Java 7 (Java 1.7) and + * prints a warning message in that case. + */ + public static void checkJavaVersion() { + try { + String versionString = System.getProperty("java.version").substring(0, 3); + double versionDouble = Double.parseDouble(versionString); + if (versionDouble < 1.7) { + LOG.warn("Flink has been started with Java 6. " + + "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + + "Flink may drop support for Java 6 in future releases, due to the " + + "unavailability of bug fixes security patches."); + } + } + catch (Exception e) { + LOG.warn("Could not parse java version for startup checks"); + LOG.debug("Exception when parsing java version", e); } } // -------------------------------------------------------------------------------------------- - + + /** Don't instantiate this class */ private EnvironmentInformation() {} - - public static void main(String[] args) { - logEnvironmentInfo(LOG, "Test"); + + // -------------------------------------------------------------------------------------------- + + /** + * Revision information encapsulates information about the source code revision of the Flink + * code. + */ + public static class RevisionInformation { + /** The git commit id (hash) */ + public String commitId; + /** The git commit date */ + public String commitDate; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java index c8ab69c..ed3cdc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java @@ -99,6 +99,16 @@ public final class MathUtils { } return (int) value; } + + /** + * Checks whether the given value is a power of two. + * + * @param value The value to check. + * @return True, if the value is a power of two, false otherwise. + */ + public static boolean isPowerOf2(long value) { + return (value & (value - 1)) == 0; + } // ============================================================================================ http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2671f2d..0630115 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -609,7 +609,7 @@ object JobManager { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") - checkJavaVersion() + EnvironmentInformation.checkJavaVersion() // parsing the command line arguments val (configuration: Configuration, @@ -651,7 +651,7 @@ object JobManager { } catch { case t: Throwable => { - LOG.error("Failed to start JobManager.", t) + LOG.error("Failed to run JobManager.", t) System.exit(STARTUP_FAILURE_RETURN_CODE) } } @@ -711,10 +711,10 @@ object JobManager { // bring up a local task manager, if needed if (executionMode.equals(LOCAL)) { - LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution") + LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode") - TaskManager.startActorWithConfiguration("", TaskManager.TASK_MANAGER_NAME, configuration, - localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) + TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress, + TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager]) } // start the job manager web frontend @@ -746,10 +746,10 @@ object JobManager { * @return Quadruple of configuration, execution mode and an optional listening address */ def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = { - val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") { - head("flink jobmanager") + val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") { + head("Flink JobManager") opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify " + - "configuration directory.") + "the configuration directory.") opt[String]("executionMode") optional() action { (arg, c) => if(arg.equals("local")){ c.copy(executionMode = LOCAL) @@ -757,7 +757,7 @@ object JobManager { c.copy(executionMode = CLUSTER) } } text { - "Specify execution mode of job manager" + "Specify the execution mode of the JobManager (CLUSTER / LOCAL)" } } @@ -777,7 +777,7 @@ object JobManager { (configuration, config.executionMode, hostname, port) } getOrElse { - throw new Exception("Wrong arguments. Usage: " + parser.usage) + throw new Exception("Invalid command line arguments. Usage: " + parser.usage) } } @@ -986,34 +986,4 @@ object JobManager { val timeout = AkkaUtils.getLookupTimeout(config) getJobManagerRemoteReference(address, system, timeout) } - - - - // -------------------------------------------------------------------------- - // Miscellaneous Utils - // -------------------------------------------------------------------------- - - /** - * Checks whether the Java version is lower than Java 7 (Java 1.7) and - * prints a warning message in that case. - */ - private def checkJavaVersion(): Unit = { - try { - if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) { - LOG.warn("Flink has been started with Java 6. " + - "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + - "Flink may drop support for Java 6 in future releases, due to the " + - "unavailability of bug fixes security patches.") - } - } - catch { - case e: Exception => - LOG.warn("Could not parse java version for startup checks") - LOG.debug("Exception when parsing java version", e) - } - } - - // -------------------------------------------------------------------------- - - class ParseException(message: String) extends Exception(message) {} } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 3679f02..0e29345 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -41,58 +41,67 @@ import scala.concurrent.{Future, Await} * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same * [[ActorSystem]], otherwise false */ -abstract class FlinkMiniCluster(userConfiguration: Configuration, +abstract class FlinkMiniCluster(val userConfiguration: Configuration, val singleActorSystem: Boolean) { - import FlinkMiniCluster._ + + protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster]) + + // -------------------------------------------------------------------------- + // Construction + // -------------------------------------------------------------------------- // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and // not getLocalHost(), which may be 127.0.1.1 val HOSTNAME = InetAddress.getByName("localhost").getHostAddress() - implicit val timeout = AkkaUtils.getTimeout(userConfiguration) + val timeout = AkkaUtils.getTimeout(userConfiguration) val configuration = generateConfiguration(userConfiguration) var jobManagerActorSystem = startJobManagerActorSystem() var jobManagerActor = startJobManager(jobManagerActorSystem) - val numTaskManagers = configuration.getInteger(ConfigConstants - .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) - - val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield { - val actorSystem = if(singleActorSystem) { - jobManagerActorSystem - } else { - startTaskManagerActorSystem(i) - } + val numTaskManagers = configuration.getInteger( + ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) - (actorSystem, startTaskManager(i)(actorSystem)) - } + var (taskManagerActorSystems, taskManagerActors) = + (for(i <- 0 until numTaskManagers) yield { + val actorSystem = if(singleActorSystem) { + jobManagerActorSystem + } else { + startTaskManagerActorSystem(i) + } - var (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip + (actorSystem, startTaskManager(i, actorSystem)) + }).unzip waitForTaskManagersToBeRegistered() + + // -------------------------------------------------------------------------- + // Construction + // -------------------------------------------------------------------------- + def generateConfiguration(userConfiguration: Configuration): Configuration def startJobManager(system: ActorSystem): ActorRef - def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef + def startTaskManager(index: Int, system: ActorSystem): ActorRef def getJobManagerAkkaConfig: Config = { - val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - - if(singleActorSystem){ + if (singleActorSystem) { AkkaUtils.getAkkaConfig(configuration, None) - }else{ + } + else { + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + AkkaUtils.getAkkaConfig(configuration, Some((HOSTNAME, port))) } } def startJobManagerActorSystem(): ActorSystem = { val config = getJobManagerAkkaConfig - AkkaUtils.createActorSystem(config) } @@ -170,7 +179,3 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, Await.ready(Future.sequence(futures), timeout) } } - -object FlinkMiniCluster{ - val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster]) -} http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 66ff4f3..37e41e2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -40,10 +40,9 @@ import org.slf4j.LoggerFactory * [[ActorSystem]], otherwise false */ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) - extends FlinkMiniCluster(userConfiguration, singleActorSystem){ - import LocalFlinkMiniCluster._ + extends FlinkMiniCluster(userConfiguration, singleActorSystem) { - val jobClientActorSystem = if(singleActorSystem){ + val jobClientActorSystem = if (singleActorSystem) { jobManagerActorSystem } else { // create an actor system listening on a random port @@ -52,25 +51,24 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: var jobClient: Option[ActorRef] = None + + override def generateConfiguration(userConfiguration: Configuration): Configuration = { val config = getDefaultConfig config.addAll(userConfiguration) - setMemory(config) - initializeIOFormatClasses(config) config } override def startJobManager(system: ActorSystem): ActorRef = { - val config = configuration.clone() - val (jobManager, _) = JobManager.startJobManagerActors(config, system) + val (jobManager, _) = JobManager.startJobManagerActors(configuration, system) jobManager } - override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = { + override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { val config = configuration.clone() val rpcPort = config.getInteger( @@ -81,26 +79,23 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) - if(rpcPort > 0){ + if (rpcPort > 0) { config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) } - if(dataPort > 0){ + if (dataPort > 0) { config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) } val localExecution = numTaskManagers == 1 - val taskManagerName = if(singleActorSystem) { + val taskManagerActorName = if (singleActorSystem) { TaskManager.TASK_MANAGER_NAME + "_" + (index + 1) } else { TaskManager.TASK_MANAGER_NAME } - TaskManager.startActorWithConfiguration(HOSTNAME, - taskManagerName, - config, - singleActorSystem, - localExecution)(system) + TaskManager.startTaskManagerActor(config, system, HOSTNAME, taskManagerActorName, + singleActorSystem, localExecution, classOf[TaskManager]) } def getJobClient(): ActorRef ={ @@ -177,7 +172,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: memorySize /= numTaskManager + 1 // the +1 is the job manager // for each TaskManager, subtract the memory needed for memory buffers - memorySize -= bufferMem; + memorySize -= bufferMem memorySize = (memorySize * memoryFraction).toLong memorySize >>>= 20 // bytes to megabytes config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize) @@ -189,31 +184,8 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME) - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - - config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) - - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) - - config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION) - - config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, - ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE) - - config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, - ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY) - - config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) - config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, - ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS) - // Reduce number of threads for local execution config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1) config.setInteger(NettyConfig.NUM_THREADS_SERVER, 2) @@ -221,7 +193,3 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: config } } - -object LocalFlinkMiniCluster{ - val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster]) -} http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 52a16bd..74c607e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -47,23 +47,23 @@ import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge} -import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, -RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} import org.apache.flink.runtime.messages.TaskManagerMessages._ -import org.apache.flink.runtime.messages.TaskManagerProfilerMessages -.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener} +import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener} import org.apache.flink.runtime.net.NetUtils import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.profiling.ProfilingUtils import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner -import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation} import org.apache.flink.util.ExceptionUtils import org.slf4j.LoggerFactory import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{Failure, Success} +import scala.collection.JavaConverters._ + import scala.language.postfixOps /** @@ -87,7 +87,8 @@ import scala.language.postfixOps * * - ... */ -class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String, +class TaskManager(val connectionInfo: InstanceConnectionInfo, + val jobManagerAkkaURL: String, val taskManagerConfig: TaskManagerConfiguration, val networkConfig: NetworkEnvironmentConfiguration) extends Actor with ActorLogMessages with ActorLogging { @@ -95,7 +96,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka import context._ import taskManagerConfig.{timeout => tmTimeout, _} -import scala.collection.JavaConverters._ + implicit val timeout = tmTimeout @@ -109,7 +110,6 @@ import scala.collection.JavaConverters._ var registrationDuration = 0 seconds var registrationAttempts: Int = 0 - TaskManager.checkTempDirs(tmpDirPaths) val ioManager = new IOManagerAsync(tmpDirPaths) val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize) val bcVarManager = new BroadcastVariableManager() @@ -123,7 +123,7 @@ import scala.collection.JavaConverters._ val profiler = profilingInterval match { case Some(interval) => log.info("Profiling of jobs is enabled.") - Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval)) + Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval, context.system)) case None => log.info("Profiling of jobs is disabled.") None @@ -140,11 +140,6 @@ import scala.collection.JavaConverters._ var instanceID: InstanceID = null var heartbeatScheduler: Option[Cancellable] = None - memoryLogggingIntervalMs.foreach { - interval => - val d = FiniteDuration(interval, TimeUnit.MILLISECONDS) - context.system.scheduler.schedule(d, d, self, LogMemoryUsage) - } override def preStart(): Unit = { tryJobManagerRegistration() @@ -717,7 +712,7 @@ import scala.collection.JavaConverters._ /** * TaskManager companion object. Contains TaskManager executable entry point, command - * line parsing, and constants. + * line parsing, constants, and setup methods for the TaskManager. */ object TaskManager { @@ -734,296 +729,522 @@ object TaskManager { val MAX_REGISTRATION_ATTEMPTS = 10 val HEARTBEAT_INTERVAL = 5000 millisecond + + // -------------------------------------------------------------------------- + // TaskManager standalone entry point + // -------------------------------------------------------------------------- + + /** + * Entry point (main method) to run the TaskManager in a standalone fashion. + * + * @param args The command line arguments. + */ def main(args: Array[String]): Unit = { + // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager") - val (hostname, port, configuration) = parseArgs(args) + EnvironmentInformation.checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { - LOG.info("Security is enabled. Starting secure TaskManager.") - SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { - override def run(): Unit = { - startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME) - } - }) - } else { - startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME) + // try to parse the command line arguments + val configuration = try { + parseArgsAndLoadConfig(args) + } + catch { + case t: Throwable => { + LOG.error(t.getMessage(), t) + System.exit(STARTUP_FAILURE_RETURN_CODE) + null + } } - } - - def startActor(hostname: String, port: Int, configuration: Configuration, - taskManagerName: String) : Unit = { - - val (taskManagerSystem, taskManager) = startActorSystemAndActor(hostname, port, configuration, - taskManagerName, localAkkaCommunication = false, localTaskManagerCommunication = false) - - // start a process reaper that watches the JobManager. If the JobManager actor dies, - // the process reaper will kill the JVM process (to ensure easy failure detection) - taskManagerSystem.actorOf( - Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE), - "TaskManager_Process_Reaper") - // block until everything is done - taskManagerSystem.awaitTermination() + // run the TaskManager (is requested in an authentication enabled context) + try { + if (SecurityUtils.isSecurityEnabled) { + LOG.info("Security is enabled. Starting secure TaskManager.") + SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { + override def run(): Unit = { + runTaskManager(configuration, classOf[TaskManager]) + } + }) + } + else { + LOG.info("Security is not enabled. Starting non-authenticated TaskManager.") + runTaskManager(configuration, classOf[TaskManager]) + } + } + catch { + case t: Throwable => { + LOG.error("Failed to run TaskManager.", t) + System.exit(STARTUP_FAILURE_RETURN_CODE) + } + } } /** - * Parse the command line arguments of the [[TaskManager]]. The method loads the configuration, - * extracts the hostname and port on which the actor system shall listen. + * Parse the command line arguments of the [[TaskManager]] and loads the configuration. * * @param args Command line arguments - * @return Tuple of (hostname, port, configuration) + * @return The parsed configuration. */ - def parseArgs(args: Array[String]): (String, Int, Configuration) = { + @throws(classOf[Exception]) + def parseArgsAndLoadConfig(args: Array[String]): Configuration = { + + // set up the command line parser val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") { head("flink task manager") opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text "Specify configuration directory." + } - opt[String]("tempDir") optional() action { (x, c) => - c.copy(tmpDir = x) - } text "Specify temporary directory." + // parse the CLI arguments + val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse { + throw new Exception( + s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}") } + // load the configuration + try { + GlobalConfiguration.loadConfiguration(cliConfig.configDir) + GlobalConfiguration.getConfiguration() + } + catch { + case e: Exception => throw new Exception("Could not load configuration", e) + } + } - parser.parse(args, TaskManagerCLIConfiguration()) map { - config => - GlobalConfiguration.loadConfiguration(config.configDir) + // -------------------------------------------------------------------------- + // Starting and running the TaskManager + // -------------------------------------------------------------------------- - val configuration = GlobalConfiguration.getConfiguration + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. - if (config.tmpDir != null && GlobalConfiguration.getString(ConfigConstants - .TASK_MANAGER_TMP_DIR_KEY, - null) == null) { - configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, config.tmpDir) - } + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. Allows to use TaskManager subclasses + * for example for YARN. + */ + @throws(classOf[Exception]) + def runTaskManager(configuration: Configuration, + taskManagerClass: Class[_ <: TaskManager]) : Unit = { + + val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration) + + // try to find out the hostname of the interface from which the TaskManager + // can connect to the JobManager. This involves a reverse name lookup + LOG.info("Trying to determine network interface and address/hostname to use") + val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort) + val taskManagerHostname = try { + NetUtils.resolveAddress(jobManagerAddress).getHostName() + } + catch { + case t: Throwable => throw new Exception("TaskManager cannot find a network interface " + + "that can communicate with the JobManager (" + jobManagerAddress + ")", t) + } - val jobManagerHostname = configuration.getString( - ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname) - val jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + // if no task manager port has been configured, use 0 (system will pick any free port) + val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0) + if (actorSystemPort < 0) { + throw new Exception("Invalid value for '" + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + + "' (port for the TaskManager actor system) : " + actorSystemPort + + " - Leave config parameter empty or use 0 to let the system choose a port automatically.") + } - val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort) + runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager]) + } - val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0) - // try to find out the TaskManager's own hostname by connecting to jobManagerAddress - val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. + * + * This method will also spawn a process reaper for the TaskManager (kill the process if + * the actor fails) and optionally start the JVM memory logging thread. + * + * @param taskManagerHostname The hostname/address of the interface where the actor system + * will communicate. + * @param actorSystemPort The port at which the actor system will communicate. + * @param configuration The configuration for the TaskManager. + */ + @throws(classOf[Exception]) + def runTaskManager(taskManagerHostname: String, + actorSystemPort: Int, + configuration: Configuration) : Unit = { - (hostname, port, configuration) - } getOrElse { - LOG.error(s"TaskManager parseArgs called with ${args.mkString(" ")}.") - LOG.error("CLI parsing failed. Usage: " + parser.usage) - sys.exit(STARTUP_FAILURE_RETURN_CODE) + runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager]) + } + + /** + * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its + * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), + * and starts the TaskManager itself. + * + * This method will also spawn a process reaper for the TaskManager (kill the process if + * the actor fails) and optionally start the JVM memory logging thread. + * + * @param taskManagerHostname The hostname/address of the interface where the actor system + * will communicate. + * @param actorSystemPort The port at which the actor system will communicate. + * @param configuration The configuration for the TaskManager. + * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager + * subclasses for example for YARN. + */ + @throws(classOf[Exception]) + def runTaskManager(taskManagerHostname: String, + actorSystemPort: Int, + configuration: Configuration, + taskManagerClass: Class[_ <: TaskManager]) : Unit = { + + LOG.info("Starting TaskManager") + + // Bring up the TaskManager actor system first, bind it to the given address. + LOG.info("Starting TaskManager actor system") + + val taskManagerSystem = try { + AkkaUtils.createActorSystem(configuration, Some((taskManagerHostname, actorSystemPort))) + } + catch { + case t: Throwable => { + if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) { + val cause = t.getCause() + if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) { + val address = taskManagerHostname + ":" + actorSystemPort + throw new Exception("Unable to bind TaskManager actor system to address " + + address + " - " + cause.getMessage(), t) + } + } + throw new Exception("Could not create TaskManager actor system", t) + } + } + + // start all the TaskManager services (network stack, library cache, ...) + // and the TaskManager actor + try { + LOG.info("Starting TaskManager actor") + val taskManager = startTaskManagerActor(configuration, taskManagerSystem, taskManagerHostname, + TASK_MANAGER_NAME, false, false, taskManagerClass) + + // start a process reaper that watches the JobManager. If the JobManager actor dies, + // the process reaper will kill the JVM process (to ensure easy failure detection) + LOG.debug("Starting TaskManager process reaper") + taskManagerSystem.actorOf( + Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE), + "TaskManager_Process_Reaper") + + // if desired, start the logging daemon that periodically logs the + // memory usage information + if (LOG.isInfoEnabled && configuration.getBoolean( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { + LOG.info("Starting periodic memory usage logger") + + val interval = configuration.getLong( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS) + + val logger = new Thread("Memory Usage Logger") { + override def run(): Unit = { + try { + val memoryMXBean = ManagementFactory.getMemoryMXBean + val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala + + while (!taskManagerSystem.isTerminated) { + Thread.sleep(interval) + LOG.info(getMemoryUsageStatsAsString(memoryMXBean)) + LOG.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) + } + } + catch { + case t: Throwable => LOG.error("Memory usage logging thread died", t) + } + } + } + logger.setDaemon(true) + logger.start() + } + + // block until everything is done + taskManagerSystem.awaitTermination() + } + catch { + case t: Throwable => { + LOG.error("Error while starting up taskManager", t) + try { + taskManagerSystem.shutdown() + } catch { + case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt) + } + throw t + } } } - def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration, - taskManagerName: String, - localAkkaCommunication: Boolean, - localTaskManagerCommunication: Boolean): (ActorSystem, ActorRef) = { - implicit val actorSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, port))) + @throws(classOf[Exception]) + def startTaskManagerActor(configuration: Configuration, + actorSystem: ActorSystem, + taskManagerHostname: String, + taskManagerActorName: String, + localAkkaCommunication: Boolean, + localTaskManagerCommunication: Boolean, + taskManagerClass: Class[_ <: TaskManager]): ActorRef = { - val (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) = - parseConfiguration(hostname, configuration, localAkkaCommunication, - localTaskManagerCommunication) + val (tmConfig, netConfig, connectionInfo, jmAkkaURL) = parseTaskManagerConfiguration( + configuration, taskManagerHostname, localAkkaCommunication, localTaskManagerCommunication) + + val tmProps = Props(taskManagerClass, connectionInfo, jmAkkaURL, tmConfig, netConfig) + actorSystem.actorOf(tmProps, taskManagerActorName) + } + + /** + * Starts the profiler actor. + * + * @param instanceActorPath The actor path of the taskManager that is profiled. + * @param reportInterval The interval in which the profiler runs. + * @param actorSystem The actor system for the profiler actor + * @return The profiler actor ref. + */ + private def startProfiler(instanceActorPath: String, + reportInterval: Long, + actorSystem: ActorSystem): ActorRef = { + + val profilerProps = Props(classOf[TaskManagerProfiler], instanceActorPath, reportInterval) + actorSystem.actorOf(profilerProps, PROFILER_NAME) + } + + // -------------------------------------------------------------------------- + // Resolving the TaskManager actor + // -------------------------------------------------------------------------- + + /** + * Resolves the TaskManager actor reference in a blocking fashion. + * + * @param taskManagerUrl The akka URL of the JobManager. + * @param system The local actor system that should perform the lookup. + * @param timeout The maximum time to wait until the lookup fails. + * @throws java.io.IOException Thrown, if the lookup fails. + * @return The ActorRef to the TaskManager + */ + @throws(classOf[IOException]) + def getTaskManagerRemoteReference(taskManagerUrl: String, + system: ActorSystem, + timeout: FiniteDuration): ActorRef = { + try { + val future = AkkaUtils.getReference(taskManagerUrl, system, timeout) + Await.result(future, timeout) + } + catch { + case e @ (_ : ActorNotFound | _ : TimeoutException) => + throw new IOException( + s"TaskManager at $taskManagerUrl not reachable. " + + s"Please make sure that the TaskManager is running and its port is reachable.", e) - (actorSystem, startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig, - networkConfig)) + case e: IOException => + throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e) + } } + // -------------------------------------------------------------------------- + // Miscellaneous Utilities + // -------------------------------------------------------------------------- + /** - * Extracts from the configuration the TaskManager's settings. Returns the TaskManager's - * connection information, the JobManager's Akka URL, the task manager configuration and the - * network connection configuration. + * Utility method to extract TaskManager config parameters from the configuration and to + * sanity check them. * - * @param hostname Hostname of the instance on which the TaskManager runs - * @param configuration Configuration instance containing the user provided configuration values - * @param localAkkaCommunication true if the TaskManager runs in the same [[ActorSystem]] as the - * JobManager, otherwise false - * @param localTaskManagerCommunication true if all TaskManager run in the same JVM, otherwise - * false - * @return Tuple of (TaskManager's connection information, JobManager's Akka URL, TaskManager's - * configuration, network connection configuration) + * @param configuration The configuration. + * @param taskManagerHostname The host name under which the TaskManager communicates. + * @param localAkkaCommunication True, if the TaskManager runs in the same actor + * system as its JobManager. + * @param localTaskManagerCommunication True, to skip initializing the network stack. + * Use only when only one task manager is used. + * @return A tuple (TaskManagerConfiguration, network configuration, + * InstanceConnectionInfo, JobManager actor Akka URL). */ - def parseConfiguration(hostname: String, configuration: Configuration, - localAkkaCommunication: Boolean, - localTaskManagerCommunication: Boolean): - (InstanceConnectionInfo, String, TaskManagerConfiguration, NetworkEnvironmentConfiguration) = { + @throws(classOf[Exception]) + def parseTaskManagerConfiguration(configuration: Configuration, + taskManagerHostname: String, + localAkkaCommunication: Boolean, + localTaskManagerCommunication: Boolean): + (TaskManagerConfiguration, NetworkEnvironmentConfiguration, InstanceConnectionInfo, String) = { + + // ------- read values from the config and check them --------- + // (a lot of them) + + // ----> hosts / ports for communication and data exchange + val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) match { - case 0 => NetUtils.getAvailablePort + case 0 => NetUtils.getAvailablePort() case x => x } - val connectionInfo = new InstanceConnectionInfo(InetAddress.getByName(hostname), dataport) + checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + "Leave config parameter empty or use 0 to let the system choose a port automatically.") + + val taskManagerAddress = InetAddress.getByName(taskManagerHostname) + val connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport) - val jobManagerURL = if (localAkkaCommunication) { + val jobManagerActorURL = if (localAkkaCommunication) { // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL JobManager.getLocalJobManagerAkkaURL } else { - val jobManagerAddress = configuration.getString( - ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) - - val jobManagerRPCPort = configuration.getInteger( - ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - - if (jobManagerAddress == null) { - throw new RuntimeException( - "JobManager address has not been specified in the configuration.") - } - - val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), - jobManagerRPCPort) + // both run in different actor system + val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration) + val hostPort = new InetSocketAddress(jobManagerHostname, jobManagerPort) JobManager.getRemoteJobManagerAkkaURL(hostPort) } - val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories - val numberOfSlots = if (slots > 0) slots else 1 + // we need this because many configs have been written with a "-1" entry + val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) match { + case -1 => 1 + case x => x + } val pageSize = configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) - - val tmpDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator) - val numNetworkBuffers = configuration.getInteger( ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) - val nettyConfig = localTaskManagerCommunication match { - case true => None - case false => Some(new NettyConfig( + val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L) + + checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + "Number of task slots must be at least one.") + + checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY) + + checkConfigParameter(pageSize >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, + "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE) + + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, + "Buffer size must be a power of 2.") + + checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "Leave this config parameter empty to let the system automatically " + + "pick a fraction of the available memory.") + + val tmpDirs = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH) + .split(",|" + File.pathSeparator) + + checkTempDirs(tmpDirs) + + val nettyConfig = if (localTaskManagerCommunication) { + None + } else { + Some(new NettyConfig( connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration)) } val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig) - val networkBufferMem = if (localTaskManagerCommunication) 0 else numNetworkBuffers * pageSize - - val configuredMemory: Long = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1 - ) + val networkBufferMem = numNetworkBuffers * pageSize val memorySize = if (configuredMemory > 0) { - configuredMemory << 20 - } else { + LOG.info("Using {} MB for Flink managed memory.", configuredMemory) + configuredMemory << 20 // megabytes to bytes + } + else { val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + checkConfigParameter(fraction > 0.0f, fraction, + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + "MemoryManager fraction of the free memory must be positive.") - LOG.info("Using {} of the free heap space for managed memory.", fraction) + val relativeMemSize = ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() - + networkBufferMem) * fraction).toLong - ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - networkBufferMem) * fraction) - .toLong - } + LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).", + fraction, relativeMemSize >> 20) - val memoryLoggingIntervalMs = configuration.getBoolean( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD - ) match { - case true => Some( - configuration.getLong(ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS) - ) - case false => None + relativeMemSize } - val profilingInterval = configuration.getBoolean( - ProfilingUtils.ENABLE_PROFILING_KEY, false - ) match { - case true => Some(configuration.getInteger(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY, - ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL).toLong) - case false => None + // ----> timeouts, library caching, profiling + + val timeout = try { + AkkaUtils.getTimeout(configuration) + } + catch { + case e: Exception => throw new Exception( + s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " + + s"Use formats like '50 s' or '1 min' to specify the timeout.") } + LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout) + + val profilingInterval = + if (configuration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) { + Some(configuration.getLong(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY, + ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL)) + } else { + None + } val cleanupInterval = configuration.getLong( ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - val timeout = AkkaUtils.getTimeout(configuration) + val maxRegistrationDuration = Duration(configuration.getString( ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)) - val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize, - tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout, - maxRegistrationDuration, configuration) - - (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) - } - - def startActor(taskManagerName: String, - connectionInfo: InstanceConnectionInfo, - jobManagerURL: String, - taskManagerConfig: TaskManagerConfiguration, - networkConfig: NetworkEnvironmentConfiguration) - (implicit actorSystem: ActorSystem): ActorRef = { - startActor(taskManagerName, - Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConfig))) - } + val taskManagerConfig = TaskManagerConfiguration(slots, memorySize, pageSize, + tmpDirs, cleanupInterval, profilingInterval, timeout, maxRegistrationDuration, + configuration) - def startActor(taskManagerName: String, props: Props) - (implicit actorSystem: ActorSystem): ActorRef = { - actorSystem.actorOf(props, taskManagerName) + (taskManagerConfig, networkConfig, connectionInfo, jobManagerActorURL) } - def startActorWithConfiguration(hostname: String, taskManagerName: String, - configuration: Configuration, - localAkkaCommunication: Boolean, - localTaskManagerCommunication: Boolean) - (implicit system: ActorSystem) = { - val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = - parseConfiguration(hostname, configuration, localAkkaCommunication, - localTaskManagerCommunication) - - startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig, - networkConnectionConfiguration) - } - - def startProfiler(instancePath: String, reportInterval: Long)(implicit system: ActorSystem): - ActorRef = { - system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, reportInterval), PROFILER_NAME) - } - - // -------------------------------------------------------------------------- - // Resolving the TaskManager actor - // -------------------------------------------------------------------------- - /** - * Resolves the TaskManager actor reference in a blocking fashion. + * Gets the hostname and port of the JobManager from the configuration. Also checks that + * the hostname is not null and the port non-negative. * - * @param taskManagerUrl The akka URL of the JobManager. - * @param system The local actor system that should perform the lookup. - * @param timeout The maximum time to wait until the lookup fails. - * @throws java.io.IOException Thrown, if the lookup fails. - * @return The ActorRef to the TaskManager + * @param configuration The configuration to read the config values from. + * @return A 2-tuple (hostname, port). */ - @throws(classOf[IOException]) - def getTaskManagerRemoteReference(taskManagerUrl: String, - system: ActorSystem, - timeout: FiniteDuration): ActorRef = { - try { - val future = AkkaUtils.getReference(taskManagerUrl, system, timeout) - Await.result(future, timeout) + private def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = { + + val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + + if (hostname == null) { + throw new Exception("Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY + + "' is missing (hostname/address of JobManager to connect to).") } - catch { - case e @ (_ : ActorNotFound | _ : TimeoutException) => - throw new IOException( - s"TaskManager at $taskManagerUrl not reachable. " + - s"Please make sure that the TaskManager is running and its port is reachable.", e) - case e: IOException => - throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e) + if (port <= 0) { + throw new Exception("Invalid value for '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + + "' (port of the JobManager actor system) : " + port) } + + (hostname, port) } - // -------------------------------------------------------------------------- - // Miscellaneous Utilities - // -------------------------------------------------------------------------- + private def checkConfigParameter(condition: Boolean, + parameter: Any, + name: String, + errorMessage: String = ""): Unit = { + if (!condition) { + throw new Exception( + s"Invalid configuration value for '${name}' : ${parameter} - ${errorMessage}") + } + } private def checkTempDirs(tmpDirs: Array[String]): Unit = { tmpDirs.zipWithIndex.foreach { @@ -1052,7 +1273,6 @@ object TaskManager { f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)") } case (_, id) => throw new Exception(s"Temporary file directory #$id is null.") - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala index d2d9cf4..5c71f5e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala @@ -22,6 +22,5 @@ package org.apache.flink.runtime.taskmanager * Command line configuration object for the [[TaskManager]] * * @param configDir Path to configuration directory - * @param tmpDir Path to temporary directory */ -case class TaskManagerCLIConfiguration(configDir: String = null, tmpDir: String = null) +case class TaskManagerCLIConfiguration(configDir: String = null) http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala index 8c1217e..19d55f7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala @@ -22,9 +22,10 @@ import org.apache.flink.configuration.Configuration import scala.concurrent.duration.{Duration, FiniteDuration} -case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int, - tmpDirPaths: Array[String], cleanupInterval: Long, - memoryLogggingIntervalMs: Option[Long], +case class TaskManagerConfiguration(numberOfSlots: Int, + memorySize: Long, pageSize: Int, + tmpDirPaths: Array[String], + cleanupInterval: Long, profilingInterval: Option[Long], timeout: FiniteDuration, maxRegistrationDuration: Duration, http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala index abd44b1..51e99f9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala @@ -43,8 +43,8 @@ import scala.concurrent.duration.FiniteDuration * @param instancePath Akka URL to [[TaskManager]] instance * @param reportInterval Interval of profiling action */ -class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) extends Actor with -ActorLogMessages with ActorLogging { +class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) + extends Actor with ActorLogMessages with ActorLogging { import context.dispatcher http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java index b92cfd4..2d752eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java @@ -55,8 +55,6 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; */ public class TaskManagerProcessReapingTest { - private static final String TASK_MANAGER_ACTOR_NAME = "TEST_TM"; - @Test public void testReapProcessOnFailure() { Process taskManagerProcess = null; @@ -108,7 +106,7 @@ public class TaskManagerProcessReapingTest { // grab the reference to the TaskManager. try multiple times, until the process // is started and the TaskManager is up String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s", - "127.0.0.1", taskManagerPort, TASK_MANAGER_ACTOR_NAME); + "127.0.0.1", taskManagerPort, TaskManager.TASK_MANAGER_NAME()); ActorRef taskManagerRef = null; for (int i = 0; i < 20; i++) { @@ -191,7 +189,7 @@ public class TaskManagerProcessReapingTest { cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); - TaskManager.startActor("localhost", taskManagerPort, cfg, TASK_MANAGER_ACTOR_NAME); + TaskManager.runTaskManager("localhost", taskManagerPort, cfg); // wait forever Object lock = new Object(); http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index e6a487d..d6724ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -539,7 +539,7 @@ public class TaskManagerTest { public static ActorRef createTaskManager(ActorRef jm) { Configuration cfg = new Configuration(); cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10); - GlobalConfiguration.includeConfiguration(cfg); + String jobManagerURL = jm.path().toString(); ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", @@ -551,7 +551,8 @@ public class TaskManagerTest { try { FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS); Await.ready(response, d); - }catch(Exception e){ + } + catch(Exception e) { throw new RuntimeException("Exception while waiting for the task manager registration.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java index c3d2c0f..6ffb64c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java @@ -16,20 +16,19 @@ * limitations under the License. */ - package org.apache.flink.runtime.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; -import org.apache.flink.runtime.util.MathUtils; import org.junit.Test; -public class MathUtilTest -{ +public class MathUtilTest { + @Test - public void testLog2Computation() - { + public void testLog2Computation() { assertEquals(0, MathUtils.log2floor(1)); assertEquals(1, MathUtils.log2floor(2)); assertEquals(1, MathUtils.log2floor(3)); @@ -52,8 +51,7 @@ public class MathUtilTest } @Test - public void testRoundDownToPowerOf2() - { + public void testRoundDownToPowerOf2() { assertEquals(0, MathUtils.roundDownToPowerOf2(0)); assertEquals(1, MathUtils.roundDownToPowerOf2(1)); assertEquals(2, MathUtils.roundDownToPowerOf2(2)); @@ -80,4 +78,24 @@ public class MathUtilTest assertEquals(1073741824, MathUtils.roundDownToPowerOf2(1852987883)); assertEquals(1073741824, MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE)); } + + @Test + public void testPowerOfTwo() { + assertTrue(MathUtils.isPowerOf2(1)); + assertTrue(MathUtils.isPowerOf2(2)); + assertTrue(MathUtils.isPowerOf2(4)); + assertTrue(MathUtils.isPowerOf2(8)); + assertTrue(MathUtils.isPowerOf2(32768)); + assertTrue(MathUtils.isPowerOf2(65536)); + assertTrue(MathUtils.isPowerOf2(1 << 30)); + assertTrue(MathUtils.isPowerOf2(1L + Integer.MAX_VALUE)); + assertTrue(MathUtils.isPowerOf2(1L << 41)); + assertTrue(MathUtils.isPowerOf2(1L << 62)); + + assertFalse(MathUtils.isPowerOf2(3)); + assertFalse(MathUtils.isPowerOf2(5)); + assertFalse(MathUtils.isPowerOf2(567923)); + assertFalse(MathUtils.isPowerOf2(Integer.MAX_VALUE)); + assertFalse(MathUtils.isPowerOf2(Long.MAX_VALUE)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala index ccd326f..58905ef 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala @@ -45,7 +45,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "The JobManager" should { "notify already registered TaskManagers" in { - val jm = TestingUtils.startTestingJobManager + val jm = TestingUtils.startTestingJobManager(_system) val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1) val hardwareDescription = HardwareDescription.extractFromSystem(10) @@ -67,7 +67,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "The TaskManager" should { "shutdown if its registration is refused by the JobManager" in { - val tm = TestingUtils.startTestingTaskManager(self) + val tm = TestingUtils.startTestingTaskManager(self, _system) watch(tm) @@ -83,7 +83,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "ignore RefuseRegistration messages after it has been successfully registered" in { - val tm = TestingUtils.startTestingTaskManager(self) + val tm = TestingUtils.startTestingTaskManager(self, _system) try { ignoreMsg{ @@ -112,8 +112,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { val config = new Configuration() config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second") - val tm = TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST", - self.path.toString, config) + val tm = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", + self.path.toString, config, _system) watch(tm) http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 87fca99..9e53bcb 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -33,13 +33,13 @@ import org.apache.flink.runtime.taskmanager.TaskManager * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]], * otherwise false */ -class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) extends -FlinkMiniCluster(userConfiguration, singleActorSystem) { +class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) + extends FlinkMiniCluster(userConfiguration, singleActorSystem) { override def generateConfiguration(userConfig: Configuration): Configuration = { val cfg = new Configuration() cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost") - cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort) + cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort()) cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10) cfg.addAll(userConfig) @@ -62,13 +62,11 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) { actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) } - override def startTaskManager(index: Int)(implicit system: ActorSystem) = { - val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(HOSTNAME, configuration, - localAkkaCommunication = singleActorSystem, localTaskManagerCommunication = true) + override def startTaskManager(index: Int, system: ActorSystem) = { - system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, - networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + "_" + - (index + 1)) + val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1) + + TaskManager.startTaskManagerActor(configuration, system, HOSTNAME, tmActorName, + singleActorSystem, true, classOf[TestingTaskManager]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 59106d3..06062f4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -19,14 +19,13 @@ package org.apache.flink.runtime.testingUtils import akka.actor.{Terminated, ActorRef} -import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.instance.InstanceConnectionInfo import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask -import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved -import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ @@ -34,10 +33,14 @@ import scala.concurrent.duration._ import scala.language.postfixOps /** - * Mixin for the [[TaskManager]] to support testing messages + * Subclass of the [[TaskManager]] to support testing messages */ -trait TestingTaskManager extends ActorLogMessages { - that: TaskManager => +class TestingTaskManager(connectionInfo: InstanceConnectionInfo, + jobManagerAkkaURL: String, + taskManagerConfig: TaskManagerConfiguration, + networkConfig: NetworkEnvironmentConfiguration) + extends TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConfig) { + val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() @@ -45,12 +48,16 @@ trait TestingTaskManager extends ActorLogMessages { var disconnectDisabled = false - abstract override def receiveWithLogMessages = { + + override def receiveWithLogMessages = { receiveTestMessages orElse super.receiveWithLogMessages } + /** + * Handler for testing related messages + */ def receiveTestMessages: Receive = { - + case RequestRunningTasks => sender ! ResponseRunningTasks(runningTasks.toMap) @@ -134,6 +141,4 @@ trait TestingTaskManager extends ActorLogMessages { case DisableDisconnect => disconnectDisabled = true } - - } http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 147cc8a..7d682f9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -57,17 +57,7 @@ object TestingUtils { def getDefaultTestingActorSystemConfig = testConfig - def startTestingTaskManagerWithConfiguration(hostname: String, jobManagerURL: String, - config: Configuration) - (implicit system: ActorSystem) = { - val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(hostname, config, - localAkkaCommunication = true, localTaskManagerCommunication = false) - system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, - networkConnectionConfig) with TestingTaskManager)) - } - - def startTestingJobManager(implicit system: ActorSystem): ActorRef = { + def startTestingJobManager(system: ActorSystem): ActorRef = { val config = new Configuration() val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ , @@ -84,15 +74,29 @@ object TestingUtils { system.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) } - def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { + def startTestingTaskManagerWithConfiguration(hostname: String, + jobManagerURL: String, + config: Configuration, + system: ActorSystem) = { + + val (tmConfig, netConfig, connectionInfo, _) = + TaskManager.parseTaskManagerConfiguration(config, hostname, true, false) + + val tmProps = Props(classOf[TestingTaskManager], connectionInfo, + jobManagerURL, tmConfig, netConfig) + system.actorOf(tmProps) + } + + def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = { + val jmURL = jobManager.path.toString val config = new Configuration() - val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration("localhost", config, - localAkkaCommunication = true, localTaskManagerCommunication = true) - system.actorOf(Props(new TaskManager(connectionInfo, jmURL, taskManagerConfig, - networkConnectionConfig) with TestingTaskManager)) + val (tmConfig, netConfig, connectionInfo, _) = + TaskManager.parseTaskManagerConfiguration(config, "localhost", true, true) + + val tmProps = Props(classOf[TestingTaskManager], connectionInfo, jmURL, tmConfig, netConfig) + system.actorOf(tmProps) } def startTestingCluster(numSlots: Int, numTMs: Int = 1, http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index eeb622a..1b088fc 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -48,13 +48,14 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst val forkNumber = try { Integer.parseInt(forNumberString) - }catch{ + } + catch { case e: NumberFormatException => -1 } val config = userConfiguration.clone() - if(forkNumber != -1){ + if (forkNumber != -1) { val jobManagerRPC = 1024 + forkNumber*300 val taskManagerRPC = 1024 + forkNumber*300 + 100 val taskManagerData = 1024 + forkNumber*300 + 200 @@ -83,29 +84,27 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) } - override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = { + override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { val config = configuration.clone() - val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_IPC_PORT) - val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_DATA_PORT) + val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) + + val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) - if(rpcPort > 0){ + if (rpcPort > 0) { config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) } - - if(dataPort > 0){ + if (dataPort > 0) { config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) } val localExecution = numTaskManagers == 1 - val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(HOSTNAME, config, singleActorSystem, localExecution) - - system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig, - networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index) + TaskManager.startTaskManagerActor(config, system, HOSTNAME, + TaskManager.TASK_MANAGER_NAME + index, singleActorSystem, localExecution, + classOf[TestingTaskManager]) } def restartJobManager(): Unit = { @@ -127,7 +126,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst taskManagerActorSystems(index).awaitTermination() val taskManagerActorSystem = startTaskManagerActorSystem(index) - val taskManagerActor = startTaskManager(index)(jobManagerActorSystem) + val taskManagerActor = startTaskManager(index, taskManagerActorSystem) taskManagerActors = taskManagerActors.patch(index, Seq(taskManagerActor), 1) taskManagerActorSystems = taskManagerActorSystems.patch(index, Seq(taskManagerActorSystem), 1) @@ -141,6 +140,7 @@ object ForkableFlinkMiniCluster { def startCluster(numSlots: Int, numTaskManagers: Int, timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): ForkableFlinkMiniCluster = { + val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)