flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3544) ResourceManager runtime components
Date Tue, 22 Mar 2016 15:37:25 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206573#comment-15206573
] 

ASF GitHub Bot commented on FLINK-3544:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r57008796
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
---
    @@ -0,0 +1,601 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +
    +import org.apache.flink.client.CliFrontend;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.flink.runtime.jobmanager.MemoryArchivist;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.process.ProcessReaper;
    +import org.apache.flink.runtime.taskmanager.TaskManager;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.runtime.util.SignalHandler;
    +import org.apache.flink.runtime.webmonitor.WebMonitor;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.DataOutputBuffer;
    +import org.apache.hadoop.security.Credentials;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.security.token.Token;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
    +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    +import org.apache.hadoop.yarn.api.records.LocalResource;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.util.Records;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.security.PrivilegedAction;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * This class is the executable entry point for the YARN application master.
    + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
    + * and {@link YarnFlinkResourceManager}.
    + * 
    + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles
container
    + * allocation and failure detection.
    + */
    +public class YarnApplicationMasterRunner {
    +
    +	/** Logger */
    +	protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
    +
    +	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
    +	 * before they quit */
    +	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5,
TimeUnit.MINUTES);
    +	
    +	/** The process environment variables */
    +	private static final Map<String, String> ENV = System.getenv();
    +	
    +	/** The exit code returned if the initialization of the application master failed */
    +	private static final int INIT_ERROR_EXIT_CODE = 31;
    +
    +	/** The exit code returned if the process exits because a critical actor died */
    +	private static final int ACTOR_DIED_EXIT_CODE = 32;
    +
    +
    +	// ------------------------------------------------------------------------
    +	//  Program entry point
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * The entry point for the YARN application master. 
    +	 *
    +	 * @param args The command line arguments.
    +	 */
    +	public static void main(String[] args) {
    +		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager",
args);
    +		SignalHandler.register(LOG);
    +
    +		// run and exit with the proper return code
    +		int returnCode = new YarnApplicationMasterRunner().run(args);
    +		System.exit(returnCode);
    +	}
    +	
    +	/**
    +	 * The instance entry point for the YARN application master. Obtains user group
    +	 * information and calls the main work method {@link #runApplicationMaster()} as a
    +	 * privileged action.
    +	 *
    +	 * @param args The command line arguments.
    +	 * @return The process exit code.
    +	 */
    +	protected int run(String[] args) {
    +		try {
    +			LOG.debug("All environment variables: {}", ENV);
    +			
    +			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
    +			require(yarnClientUsername != null, "YARN client user name environment variable {}
not set",
    +				YarnConfigKeys.ENV_CLIENT_USERNAME);
    +		
    +			final UserGroupInformation currentUser;
    +			try {
    +				currentUser = UserGroupInformation.getCurrentUser();
    +			} catch (Throwable t) {
    +				throw new Exception("Cannot access UserGroupInformation information for current user",
t);
    +			}
    +		
    +			LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager
as user {}",
    +				currentUser.getShortUserName(), yarnClientUsername);
    +	
    +			UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
    +			
    +			// transfer all security tokens, for example for authenticated HDFS and HBase access
    +			for (Token<?> token : currentUser.getTokens()) {
    +				ugi.addToken(token);
    +			}
    +	
    +			// run the actual work in a secured privileged action
    +			return ugi.doAs(new PrivilegedAction<Integer>() {
    +				@Override
    +				public Integer run() {
    +					return runApplicationMaster();
    +				}
    +			});
    +		}
    +		catch (Throwable t) {
    +			// make sure that everything whatever ends up in the log
    +			LOG.error("YARN Application Master initialization failed", t);
    +			return INIT_ERROR_EXIT_CODE;
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Core work method
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * The main work method, must run as a privileged action.
    +	 * 
    +	 * @return The return code for the Java process. 
    +	 */
    +	protected int runApplicationMaster() {
    +		ActorSystem actorSystem = null;
    +		WebMonitor webMonitor = null;
    +		
    +		try {
    +			// ------- (1) load and parse / validate all configurations -------
    +			
    +			// loading all config values here has the advantage that the program fails fast, if
any
    +			// configuration problem occurs
    +			
    +			final String currDir = ENV.get(Environment.PWD.key());
    +			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
    +	
    +			// Note that we use the "appMasterHostname" given by YARN here, to make sure
    +			// we use the hostnames given by YARN consistently throughout akka.
    +			// for akka "localhost" and "localhost.localdomain" are different actors.
    +			final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
    +			require(appMasterHostname != null,
    +				"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
    +	
    +			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
    +			
    +			// Flink configuration
    +			final Map<String, String> dynamicProperties =
    +				CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
    +			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
    +			
    +			final Configuration config = createConfiguration(currDir, dynamicProperties);
    +			
    +			// Hadoop/Yarn configuration (loads config data automatically from classpath files)
    +			final YarnConfiguration yarnConfig = new YarnConfiguration();
    +			
    +			final int taskManagerContainerMemory;
    +			final int numInitialTaskManagers;
    +			final int slotsPerTaskManager;
    +
    +			try {
    +				taskManagerContainerMemory = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
    +			} catch (NumberFormatException e) {
    +				throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_MEMORY +
" : "
    +					+ e.getMessage());
    +			}
    +			try {
    +				numInitialTaskManagers = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
    +			} catch (NumberFormatException e) {
    +				throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_COUNT + "
: "
    +					+ e.getMessage());
    +			}
    +			try {
    +				slotsPerTaskManager = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
    +			} catch (NumberFormatException e) {
    +				throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_SLOTS + " :
"
    +					+ e.getMessage());
    +			}
    +			
    +			final ContaineredTaskManagerParameters taskManagerParameters =
    +				ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
    +
    +			LOG.info("TaskManagers will be created with {} task slots", taskManagerParameters.numSlots());
    +			LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {}
MB, " +
    +				"JVM direct memory limit {} MB",
    +				taskManagerParameters.taskManagerTotalMemoryMB(),
    +				taskManagerParameters.taskManagerHeapSizeMB(),
    +				taskManagerParameters.taskManagerDirectMemoryLimitMB());
    +			
    +			
    +			// ----------------- (2) start the actor system -------------------
    +			
    +			// try to start the actor system, JobManager and JobManager actor system
    +			// using the port range definition from the config.
    +			final String amPortRange = config.getString(
    +					ConfigConstants.YARN_APPLICATION_MASTER_PORT,
    +					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
    +			
    +			actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange,
LOG);
    +			
    +			final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get();
    +			final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get();
    +
    +			LOG.info("Actor system bound to hostname {}.", akkaHostname);
    +			
    +			
    +			// ---- (3) Generate the configuration for the TaskManagers
    +			
    +			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
    +					config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
    +			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
    +			
    +			final ContainerLaunchContext taskManagerContext = createTaskManagerContext(
    +				config, yarnConfig, ENV,
    +				taskManagerParameters, taskManagerConfig,
    +				currDir, getTaskManagerClass(), LOG);
    +			
    +			
    +			// ---- (4) start the actors and components in this order:
    +			
    +			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
    +			// 2) Web Monitor (we need its port to register)
    +			// 3) Resource Master for YARN
    +			// 4) Process reapers for the JobManager and Resource Master
    +
    +			
    +			// 1: the JobManager
    +			LOG.debug("Starting JobManager actor");
    +
    +			// we start the JobManager with its standard name
    +			ActorRef jobManager = JobManager.startJobManagerActors(
    +				config, actorSystem,
    +				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
    +				scala.Option.<String>empty(),
    +				getJobManagerClass(),
    +				getArchivistClass())._1();
    +
    +
    +			// 2: the web monitor
    +			LOG.debug("Starting Web Frontend");
    +
    +			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager,
LOG);
    +			final String webMonitorURL = webMonitor == null ? null :
    +				"http://" + appMasterHostname + ":" + webMonitor.getServerPort();
    +
    +			// 3: Flink's Yarn resource manager
    +			LOG.debug("Starting YARN Flink Resource Manager");
    +
    +			// we need the leader retrieval service here to be informed of new
    +			// leader session IDs, even though there can be only one leader ever
    +			LeaderRetrievalService leaderRetriever = 
    +				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
    +			
    +			Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
    +				getResourceManagerClass(),
    +				config,
    +				yarnConfig,
    +				leaderRetriever,
    +				appMasterHostname,
    +				webMonitorURL,
    +				taskManagerParameters,
    +				taskManagerContext,
    +				numInitialTaskManagers, 
    +				LOG);
    +			
    +			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
    +			
    +			
    +			// 4: Process reapers
    +			// The process reapers ensure that upon unexpected actor death, the process exits
    +			// and does not stay lingering around unresponsive
    +			
    +			LOG.debug("Starting process reapers for JobManager and YARN Application Master");
    +
    +			actorSystem.actorOf(
    +				Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
    +				"YARN_Resource_Master_Process_Reaper");
    +			
    +			actorSystem.actorOf(
    +				Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE),
    +				"JobManager_Process_Reaper");
    +		}
    +		catch (Throwable t) {
    +			// make sure that everything whatever ends up in the log
    +			LOG.error("YARN Application Master initialization failed", t);
    +			
    +			if (actorSystem != null) {
    +				try {
    +					actorSystem.shutdown();
    +				} catch (Throwable tt) {
    +					LOG.error("Error shutting down actor system", tt);
    +				}
    +			}
    +
    +			if (webMonitor != null) {
    +				try {
    +					webMonitor.stop();
    +				} catch (Throwable ignored) {}
    +			}
    +			
    +			return INIT_ERROR_EXIT_CODE;
    +		}
    +		
    +		// everything started, we can wait until all is done or the process is killed
    +		LOG.info("YARN Application Master started");
    +		
    +		// wait until everything is done
    +		actorSystem.awaitTermination();
    +
    +		// if we get here, everything work out jolly all right, and we even exited smoothly
    +		if (webMonitor != null) {
    +			try {
    +				webMonitor.stop();
    +			} catch (Throwable ignored) {}
    --- End diff --
    
    Would be maybe good to log the error.


> ResourceManager runtime components
> ----------------------------------
>
>                 Key: FLINK-3544
>                 URL: https://issues.apache.org/jira/browse/FLINK-3544
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>    Affects Versions: 1.1.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message