Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 876D117D8C for ; Tue, 17 Feb 2015 09:43:07 +0000 (UTC) Received: (qmail 7072 invoked by uid 500); 17 Feb 2015 09:42:58 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 7042 invoked by uid 500); 17 Feb 2015 09:42:58 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 7033 invoked by uid 99); 17 Feb 2015 09:42:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Feb 2015 09:42:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1614E03C8; Tue, 17 Feb 2015 09:42:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-1557] Move JobManager web frontend server out of JobManager actor Date: Tue, 17 Feb 2015 09:42:57 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 743399a41 -> c1e326707 [FLINK-1557] Move JobManager web frontend server out of JobManager actor Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1e32670 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1e32670 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1e32670 Branch: refs/heads/master Commit: c1e326707a6c5b3cc550945f010eb4cabea24ad3 Parents: 743399a Author: Stephan Ewen Authored: Mon Feb 16 16:34:13 2015 +0100 Committer: Stephan Ewen Committed: Tue Feb 17 09:33:57 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/Configuration.java | 6 +- .../runtime/jobmanager/web/WebInfoServer.java | 11 ++- .../flink/runtime/jobmanager/JobManager.scala | 43 +++++++---- .../runtime/jobmanager/WithWebServer.scala | 40 ---------- .../runtime/minicluster/FlinkMiniCluster.scala | 3 +- .../minicluster/LocalFlinkMiniCluster.scala | 24 +++--- .../runtime/testingUtils/TestingCluster.scala | 2 +- .../apache/flink/yarn/ApplicationMaster.scala | 78 ++++++++++++-------- .../scala/org/apache/flink/yarn/Messages.scala | 7 +- .../org/apache/flink/yarn/YarnJobManager.scala | 10 +-- 10 files changed, 112 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index a1515d0..62598a4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; /** * Lightweight configuration object which can store key/value pairs. */ +@SuppressWarnings("EqualsBetweenInconvertibleTypes") public class Configuration implements IOReadableWritable, java.io.Serializable, Cloneable { private static final long serialVersionUID = 1L; @@ -584,6 +585,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable, return hash; } + @SuppressWarnings("EqualsBetweenInconvertibleTypes") @Override public boolean equals(Object obj) { if (this == obj) { @@ -596,11 +598,11 @@ public class Configuration implements IOReadableWritable, java.io.Serializable, Object thisVal = e.getValue(); Object otherVal = otherConf.get(e.getKey()); - if (thisVal.getClass() != byte[].class) { + if (!thisVal.getClass().equals(byte[].class)) { if (!thisVal.equals(otherVal)) { return false; } - } else if (otherVal.getClass() == byte[].class) { + } else if (otherVal.getClass().equals(byte[].class)) { if (!Arrays.equals((byte[]) thisVal, (byte[]) otherVal)) { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 1166d70..02714b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.URL; import akka.actor.ActorRef; +import org.apache.flink.runtime.akka.AkkaUtils; import org.eclipse.jetty.server.handler.ResourceHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,13 +78,15 @@ public class WebInfoServer { * @throws IOException * Thrown, if the server setup failed for an I/O related reason. */ - public WebInfoServer(Configuration config, ActorRef jobmanager, - ActorRef archive, FiniteDuration timeout) throws IOException { - - // if no explicit configuration is given, use the global configuration + public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive) throws IOException { if (config == null) { throw new IllegalArgumentException("No Configuration has been passed to the web server"); } + if (jobmanager == null || archive == null) { + throw new NullPointerException(); + } + + final FiniteDuration timeout = AkkaUtils.getTimeout(config); this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d522d2d..4fe0ea6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -25,7 +25,8 @@ import akka.actor.Status.Failure import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph} +import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph} +import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.Messages.Acknowledge @@ -42,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.RegistrationMessages._ -import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, StackTrace, NextInputSplit, Heartbeat} +import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, NextInputSplit, Heartbeat} import org.apache.flink.runtime.profiling.ProfilingUtils import org.apache.flink.util.InstantiationUtil @@ -656,8 +657,10 @@ object JobManager { try { LOG.debug("Starting JobManager actor") - startActor(configuration, jobManagerSystem, true) + // bring up the job manager actor + val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem) + // bring up a local task manager, if needed if(executionMode.equals(LOCAL)){ LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution") @@ -665,10 +668,14 @@ object JobManager { localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) } - jobManagerSystem.awaitTermination() + // start the job manager web frontend + LOG.info("Starting JobManger web frontend") + val webServer = new WebInfoServer(configuration, jobManager, archiver) + webServer.start() } catch { case t: Throwable => { + LOG.error("Error while starting up JobManager", t) try { jobManagerSystem.shutdown() } catch { @@ -677,6 +684,9 @@ object JobManager { throw t } } + + // block until everything is shut down + jobManagerSystem.awaitTermination() } /** @@ -814,9 +824,16 @@ object JobManager { profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount) } - def startActor(configuration: Configuration, - actorSystem: ActorSystem, - withWebServer: Boolean): ActorRef = { + /** + * Starts the JobManager and job archiver based on the given configuration, in the + * given actor system. + * + * @param configuration + * @param actorSystem + * @return A tuple of references (JobManager Ref, Archiver Ref) + */ + def startJobManagerActors(configuration: Configuration, + actorSystem: ActorSystem): (ActorRef, ActorRef) = { val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, profilerProps, executionRetries, delayBetweenRetries, @@ -827,17 +844,13 @@ object JobManager { val archiver: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME) - val jobManagerProps = if (withWebServer) { - Props(new JobManager(configuration, instanceManager, scheduler, - libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries, - delayBetweenRetries, timeout) with WithWebServer) - } else { - Props(classOf[JobManager], configuration, instanceManager, scheduler, + val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler, libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries, delayBetweenRetries, timeout) - } - startActor(jobManagerProps, actorSystem) + val jobManager = startActor(jobManagerProps, actorSystem) + + (jobManager, archiver) } def startActor(props: Props, actorSystem: ActorSystem): ActorRef = { http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala deleted file mode 100644 index bc83b9f..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.Actor -import org.apache.flink.runtime.jobmanager.web.WebInfoServer - -/** - * Mixin for the [[JobManager]] which starts a [[WebInfoServer]] for the JobManager. - */ -trait WithWebServer extends Actor { - that: JobManager => - - val webServer = new WebInfoServer(configuration, self, archive, timeout) - webServer.start() - - abstract override def postStop(): Unit = { - log.info("Stopping webserver.") - webServer.stop() - log.info("Stopped webserver.") - - super.postStop() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 8f79003..6eea21c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -75,7 +75,8 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, def generateConfiguration(userConfiguration: Configuration): Configuration - def startJobManager(implicit system: ActorSystem): ActorRef + def startJobManager(system: ActorSystem): ActorRef + def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef def getJobManagerAkkaConfig: Config = { http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index c24d96a..8b16969 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -64,33 +64,31 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: config } - override def startJobManager(implicit system: ActorSystem): - ActorRef = { + override def startJobManager(system: ActorSystem): ActorRef = { val config = configuration.clone() - JobManager.startActor(config, system, false) + val (jobManager, _) = JobManager.startJobManagerActors(config, system) + jobManager } override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = { val config = configuration.clone() - val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_IPC_PORT) - val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants - .DEFAULT_TASK_MANAGER_DATA_PORT) + val rpcPort = config.getInteger( + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) + + val dataPort = config.getInteger( + ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) if(rpcPort > 0){ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) } - if(dataPort > 0){ config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) } - val localExecution = if(numTaskManagers == 1){ - true - } else { - false - } + val localExecution = numTaskManagers == 1 TaskManager.startActorWithConfiguration(HOSTNAME, config, http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 6ae943d..e2660d5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -46,7 +46,7 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) { cfg } - override def startJobManager(implicit actorSystem: ActorSystem): ActorRef = { + override def startJobManager(actorSystem: ActorSystem): ActorRef = { val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ , executionRetries, delayBetweenRetries, http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 62b7caf..8ba4408 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -23,9 +23,10 @@ import java.security.PrivilegedAction import akka.actor._ import org.apache.flink.client.CliFrontend -import org.apache.flink.configuration.ConfigConstants +import org.apache.flink.configuration.{Configuration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager} +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.yarn.Messages.StartYarnSession import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -56,15 +57,16 @@ object ApplicationMaster { ugi.doAs(new PrivilegedAction[Object] { override def run(): Object = { + var actorSystem: ActorSystem = null - var jobManager: ActorRef = ActorRef.noSender + var webserver: WebInfoServer = null try { val conf = new YarnConfiguration() val env = System.getenv() - if(LOG.isDebugEnabled) { + if (LOG.isDebugEnabled) { LOG.debug("All environment variables: " + env.toString) } @@ -74,48 +76,55 @@ object ApplicationMaster { val logDirs = env.get(Environment.LOG_DIRS.key()) // Note that we use the "ownHostname" given by YARN here, to make sure - // we use the hostnames given by YARN consitently throuout akka. + // we use the hostnames given by YARN consistently throughout akka. // for akka "localhost" and "localhost.localdomain" are different actors. val ownHostname = env.get(Environment.NM_HOST.key()) - require(ownHostname != null, s"Own hostname not set.") + require(ownHostname != null, "Own hostname in YARN not set.") val taskManagerCount = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES) - val jobManagerWebPort = 0 // automatic assignment. - - val (system, actor) = startJobManager(currDir, ownHostname,dynamicPropertiesEncodedString, - jobManagerWebPort, logDirs) + val (config, system, jobManager, archiver) = startJobManager(currDir, ownHostname, + dynamicPropertiesEncodedString, logDirs) actorSystem = system - jobManager = actor val extActor = system.asInstanceOf[ExtendedActorSystem] val jobManagerPort = extActor.provider.getDefaultAddress.port.get + // start the web info server + LOG.info("Starting Job Manger web frontend.") + webserver = new WebInfoServer(config, jobManager, archiver) + + val jobManagerWebPort = webserver.getServer.getConnectors()(0).getLocalPort + // generate configuration file for TaskManagers generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, ownHostname, jobManagerPort, jobManagerWebPort, logDirs, slots, taskManagerCount, dynamicPropertiesEncodedString) - // send "start yarn session" message to YarnJobManager. - LOG.info("Start yarn session on job manager.") - jobManager ! StartYarnSession(conf, jobManagerPort) + LOG.info("Starting YARN session on Job Manager.") + jobManager ! StartYarnSession(conf, jobManagerPort, jobManagerWebPort) - LOG.info("Application Master properly initiated. Await termination of actor system.") + LOG.info("Application Master properly initiated. Awaiting termination of actor system.") actorSystem.awaitTermination() - }catch{ + } + catch { case t: Throwable => LOG.error("Error while running the application master.", t) - if(actorSystem != null){ + if (actorSystem != null) { actorSystem.shutdown() actorSystem.awaitTermination() - - actorSystem = null } } + finally { + if (webserver != null) { + LOG.debug("Stopping Job Manager web frontend.") + webserver.stop() + } + } null } @@ -166,10 +175,22 @@ object ApplicationMaster { output.close() } - def startJobManager(currDir: String, hostname: String, dynamicPropertiesEncodedString: String, - jobManagerWebPort: Int, logDirs: String): (ActorSystem, ActorRef) = { - - LOG.info("Start job manager for yarn") + /** + * Starts the JobManager and all its components. + * + * @param currDir + * @param hostname + * @param dynamicPropertiesEncodedString + * @param logDirs + * + * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef) + */ + def startJobManager(currDir: String, + hostname: String, + dynamicPropertiesEncodedString: String, + logDirs: String): (Configuration, ActorSystem, ActorRef, ActorRef) = { + + LOG.info("Starting JobManager for YARN") val args = Array[String]("--configDir", currDir) LOG.info(s"Config path: $currDir.") @@ -181,15 +202,13 @@ object ApplicationMaster { for(property <- dynamicProperties.asScala){ configuration.setString(property.f0, property.f1) } - configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, jobManagerWebPort) - configuration.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs) // set port to 0 to let Akka automatically determine the port. + LOG.debug("Starting JobManager actor system") val jobManagerSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, 0))) - LOG.info("Start job manager actor"); - // start all the components inside the job manager + LOG.debug("Starting JobManager components") val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, profilerProps, executionRetries, delayBetweenRetries, timeout, _) = JobManager.createJobManagerComponents(configuration) @@ -203,10 +222,11 @@ object ApplicationMaster { val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler, libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries, - delayBetweenRetries, timeout) with WithWebServer with YarnJobManager) + delayBetweenRetries, timeout) with YarnJobManager) + LOG.debug("Starting JobManager actor") val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem) - (jobManagerSystem, jobManager) + (configuration, jobManagerSystem, jobManager, archiver) } } http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala index 0ac135d..e880fdf 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala @@ -26,13 +26,18 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records.FinalApplicationStatus object Messages { + case class YarnMessage(message: String, date: Date = new Date()) case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int) case object RegisterClient case class StopYarnSession(status: FinalApplicationStatus) + case object JobManagerStopped - case class StartYarnSession(configuration: Configuration, actorSystemPort: Int) + + case class StartYarnSession(configuration: Configuration, + actorSystemPort: Int, + webServerport: Int) case class JobManagerActorRef(jobManager: ActorRef) http://git-wip-us.apache.org/repos/asf/flink/blob/c1e32670/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index c8dbbf1..a37c8b4 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -25,7 +25,7 @@ import java.util.Collections import akka.actor.ActorRef import org.apache.flink.configuration.ConfigConstants import org.apache.flink.runtime.ActorLogMessages -import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager} +import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus import org.apache.flink.yarn.Messages._ import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner @@ -45,7 +45,7 @@ import scala.language.postfixOps trait YarnJobManager extends ActorLogMessages { - that: JobManager with WithWebServer => + that: JobManager => import context._ import scala.collection.JavaConverters._ @@ -101,7 +101,7 @@ trait YarnJobManager extends ActorLogMessages { sender() ! new FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers, instanceManager.getTotalNumberOfSlots) - case StartYarnSession(conf, actorSystemPort: Int) => + case StartYarnSession(conf, actorSystemPort, webServerport) => log.info("Start yarn session.") val memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager) @@ -120,8 +120,6 @@ trait YarnJobManager extends ActorLogMessages { val shipListString = env.get(FlinkYarnClient.ENV_CLIENT_SHIP_FILES) val yarnClientUsername = env.get(FlinkYarnClient.ENV_CLIENT_USERNAME) - val jobManagerWebPort = that.webServer.getServer.getConnectors()(0).getLocalPort - val rm = AMRMClient.createAMRMClient[ContainerRequest]() rm.init(conf) rm.start() @@ -136,7 +134,7 @@ trait YarnJobManager extends ActorLogMessages { nmClientOption = Some(nm) // Register with ResourceManager - val url = s"http://$applicationMasterHost:$jobManagerWebPort" + val url = s"http://$applicationMasterHost:$webServerport" log.info(s"Registering ApplicationMaster with tracking url $url.") rm.registerApplicationMaster(applicationMasterHost, actorSystemPort, url)