flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Tue, 16 Aug 2016 14:45:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422829#comment-15422829
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r74949114
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
---
    @@ -0,0 +1,618 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
    +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
    +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
    +import org.apache.flink.mesos.util.MesosArtifactServer;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.mesos.util.ZooKeeperUtils;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.flink.runtime.jobmanager.MemoryArchivist;
    +import org.apache.flink.runtime.jobmanager.RecoveryMode;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.process.ProcessReaper;
    +import org.apache.flink.runtime.taskmanager.TaskManager;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.runtime.util.SignalHandler;
    +import org.apache.flink.runtime.webmonitor.WebMonitor;
    +
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import scala.Option;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.net.InetAddress;
    +import java.net.URL;
    +import java.security.PrivilegedAction;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.flink.mesos.Utils.uri;
    +import static org.apache.flink.mesos.Utils.variable;
    +
    +/**
    + * This class is the executable entry point for the Mesos Application Master.
    + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
    + * and {@link MesosFlinkResourceManager}.
    + *
    + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles
container
    + * allocation and failure detection.
    + */
    +public class MesosApplicationMasterRunner {
    +	/** Logger */
    +	protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
    +
    +	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
    +	 * before they quit */
    +	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5,
TimeUnit.MINUTES);
    +
    +	/** The process environment variables */
    +	private static final Map<String, String> ENV = System.getenv();
    +
    +	/** The exit code returned if the initialization of the application master failed */
    +	private static final int INIT_ERROR_EXIT_CODE = 31;
    +
    +	/** The exit code returned if the process exits because a critical actor died */
    +	private static final int ACTOR_DIED_EXIT_CODE = 32;
    +
    +	// ------------------------------------------------------------------------
    +	//  Program entry point
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * The entry point for the Mesos AppMaster.
    +	 *
    +	 * @param args The command line arguments.
    +	 */
    +	public static void main(String[] args) {
    +		EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", args);
    +		SignalHandler.register(LOG);
    +
    +		// run and exit with the proper return code
    +		int returnCode = new MesosApplicationMasterRunner().run(args);
    +		System.exit(returnCode);
    +	}
    +
    +	/**
    +	 * The instance entry point for the Mesos AppMaster. Obtains user group
    +	 * information and calls the main work method {@link #runPrivileged()} as a
    +	 * privileged action.
    +	 *
    +	 * @param args The command line arguments.
    +	 * @return The process exit code.
    +	 */
    +	protected int run(String[] args) {
    +		try {
    +			LOG.debug("All environment variables: {}", ENV);
    +
    +			final UserGroupInformation currentUser;
    +			try {
    +				currentUser = UserGroupInformation.getCurrentUser();
    +			} catch (Throwable t) {
    +				throw new Exception("Cannot access UserGroupInformation information for current user",
t);
    +			}
    +
    +			LOG.info("Running Flink as user {}", currentUser.getShortUserName());
    +
    +			// run the actual work in a secured privileged action
    +			return currentUser.doAs(new PrivilegedAction<Integer>() {
    +				@Override
    +				public Integer run() {
    +					return runPrivileged();
    +				}
    +			});
    +		}
    +		catch (Throwable t) {
    +			// make sure that everything whatever ends up in the log
    +			LOG.error("Mesos AppMaster initialization failed", t);
    +			return INIT_ERROR_EXIT_CODE;
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Core work method
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * The main work method, must run as a privileged action.
    +	 *
    +	 * @return The return code for the Java process.
    +	 */
    +	protected int runPrivileged() {
    +
    +		ActorSystem actorSystem = null;
    +		WebMonitor webMonitor = null;
    +		MesosArtifactServer artifactServer = null;
    +
    +		try {
    +			// ------- (1) load and parse / validate all configurations -------
    +
    +			// loading all config values here has the advantage that the program fails fast, if
any
    +			// configuration problem occurs
    +
    +			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
    +			require(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
    +
    +			final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
    +			require(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
    +
    +			// Note that we use the "appMasterHostname" given by the system, to make sure
    +			// we use the hostnames consistently throughout akka.
    +			// for akka "localhost" and "localhost.localdomain" are different actors.
    +			final String appMasterHostname = InetAddress.getLocalHost().getHostName();
    +
    +			// Flink configuration
    +			final Configuration dynamicProperties =
    +				FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
    +			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
    +
    +			final Configuration config = createConfiguration(workingDir, dynamicProperties);
    +
    +			// Mesos configuration
    +			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
    +
    +			// environment values related to TM
    +			final int taskManagerContainerMemory;
    +			final int numInitialTaskManagers;
    +			final int slotsPerTaskManager;
    +
    +			try {
    +				taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
    +			} catch (NumberFormatException e) {
    +				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY +
" : "
    +					+ e.getMessage());
    +			}
    +			try {
    +				numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
    +			} catch (NumberFormatException e) {
    +				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT +
" : "
    +					+ e.getMessage());
    +			}
    +			try {
    +				slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
    +			} catch (NumberFormatException e) {
    +				throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " :
"
    +					+ e.getMessage());
    +			}
    +
    +			final ContaineredTaskManagerParameters containeredParameters =
    +				ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
    +
    +			final MesosTaskManagerParameters taskManagerParameters =
    +				MesosTaskManagerParameters.create(config, containeredParameters);
    +
    +			LOG.info("TaskManagers will be created with {} task slots",
    +				taskManagerParameters.containeredParameters().numSlots());
    +			LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {}
MB, " +
    +					"JVM direct memory limit {} MB, {} cpus",
    +				taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
    +				taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
    +				taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
    +				taskManagerParameters.cpus());
    +
    +			// JM endpoint, which should be explicitly configured by the dispatcher (based on
acquired net resources)
    +			final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
    +				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
    +			require(listeningPort >= 0 && listeningPort <= 65536, "Config parameter
\"" +
    +				ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and
65536");
    +
    +			// ----------------- (2) start the actor system -------------------
    +
    +			// try to start the actor system, JobManager and JobManager actor system
    +			// using the configured address and ports
    +			actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, listeningPort,
LOG);
    +
    +			final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get();
    +			final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get();
    +
    +			LOG.info("Actor system bound to hostname {}.", akkaHostname);
    +
    +			// try to start the artifact server
    +			LOG.debug("Starting Artifact Server");
    +			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
    +				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
    +			artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort);
    +
    +			// ----------------- (3) Generate the configuration for the TaskManagers -------------------
    +
    +			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
    +				config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
    +			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
    +
    +			final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext(
    +				config, mesosConfig, ENV,
    +				taskManagerParameters, taskManagerConfig,
    +				workingDir, getTaskManagerClass(), artifactServer, LOG);
    +
    +			// ----------------- (4) start the actors -------------------
    +
    +			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
    +			// 2) Web Monitor (we need its port to register)
    +			// 3) Resource Master for Mesos
    +			// 4) Process reapers for the JobManager and Resource Master
    +
    +			// 1: the JobManager
    +			LOG.debug("Starting JobManager actor");
    +
    +			// we start the JobManager with its standard name
    +			ActorRef jobManager = JobManager.startJobManagerActors(
    +				config, actorSystem,
    +				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
    +				scala.Option.<String>empty(),
    +				getJobManagerClass(),
    +				getArchivistClass())._1();
    +
    +
    +			// 2: the web monitor
    +			LOG.debug("Starting Web Frontend");
    +
    +			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager,
LOG);
    +			if(webMonitor != null) {
    +				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(),
"/");
    +				mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
    +			}
    +
    +			// 3: Flink's Mesos ResourceManager
    +			LOG.debug("Starting Mesos Flink Resource Manager");
    +
    +			// create the worker store to persist task information across restarts
    +			MesosWorkerStore workerStore = createWorkerStore(config);
    +
    +			// we need the leader retrieval service here to be informed of new
    +			// leader session IDs, even though there can be only one leader ever
    +			LeaderRetrievalService leaderRetriever =
    +				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
    +
    +			Props resourceMasterProps = MesosFlinkResourceManager.createActorProps(
    +				getResourceManagerClass(),
    +				config,
    +				mesosConfig,
    +				workerStore,
    +				leaderRetriever,
    +				taskManagerParameters,
    +				taskManagerContext,
    +				numInitialTaskManagers,
    +				LOG);
    +
    +			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
    +
    +
    +			// 4: Process reapers
    +			// The process reapers ensure that upon unexpected actor death, the process exits
    +			// and does not stay lingering around unresponsive
    +
    +			LOG.debug("Starting process reapers for JobManager");
    +
    +			actorSystem.actorOf(
    +				Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
    +				"Mesos_Resource_Master_Process_Reaper");
    +
    +			actorSystem.actorOf(
    +				Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE),
    +				"JobManager_Process_Reaper");
    +		}
    +		catch (Throwable t) {
    +			// make sure that everything whatever ends up in the log
    +			LOG.error("Mesos JobManager initialization failed", t);
    +
    +			if (actorSystem != null) {
    +				try {
    +					actorSystem.shutdown();
    +				} catch (Throwable tt) {
    +					LOG.error("Error shutting down actor system", tt);
    +				}
    +			}
    +
    +			if (webMonitor != null) {
    --- End diff --
    
    The web monitor depends on the actor system (it has some ActorRef). Consequently, it should
be stopped before the actor system is stopped.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent ResourceManager
work.
> Design document:  ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message