flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Tue, 16 Aug 2016 16:07:23 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422942#comment-15422942
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r74965913
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode>
{
    +
    +	/** The Mesos configuration (master and framework info) */
    +	private final MesosConfiguration mesosConfig;
    +
    +	/** The TaskManager container parameters (like container memory size) */
    +	private final MesosTaskManagerParameters taskManagerParameters;
    +
    +	/** Context information used to start a TaskManager Java process */
    +	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +	/** Number of failed Mesos tasks before stopping the application. -1 means infinite.
*/
    +	private final int maxFailedTasks;
    +
    +	/** Callback handler for the asynchronous Mesos scheduler */
    +	private SchedulerProxy schedulerCallbackHandler;
    +
    +	/** Mesos scheduler driver */
    +	private SchedulerDriver schedulerDriver;
    +
    +	private ActorRef connectionMonitor;
    +
    +	private ActorRef taskRouter;
    +
    +	private ActorRef launchCoordinator;
    +
    +	private ActorRef reconciliationCoordinator;
    +
    +	private MesosWorkerStore workerStore;
    +
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +	/** The number of failed tasks since the master became active */
    +	private int failedTasksSoFar;
    +
    +	public MesosFlinkResourceManager(
    +		Configuration flinkConfig,
    +		MesosConfiguration mesosConfig,
    +		MesosWorkerStore workerStore,
    +		LeaderRetrievalService leaderRetrievalService,
    +		MesosTaskManagerParameters taskManagerParameters,
    +		Protos.TaskInfo.Builder taskManagerLaunchContext,
    +		int maxFailedTasks,
    +		int numInitialTaskManagers) {
    +
    +		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
    +
    +		this.mesosConfig = requireNonNull(mesosConfig);
    +
    +		this.workerStore = requireNonNull(workerStore);
    +
    +		this.taskManagerParameters = requireNonNull(taskManagerParameters);
    +		this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
    +		this.maxFailedTasks = maxFailedTasks;
    +
    +		this.workersInNew = new HashMap<>();
    +		this.workersInLaunch = new HashMap<>();
    +		this.workersBeingReturned = new HashMap<>();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Mesos-specific behavior
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	protected void initialize() throws Exception {
    +		LOG.info("Initializing Mesos resource master");
    +
    +		workerStore.start();
    +
    +		// create the scheduler driver to communicate with Mesos
    +		schedulerCallbackHandler = new SchedulerProxy(self());
    +
    +		// register with Mesos
    +		FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
    +			.clone()
    +			.setCheckpoint(true);
    +
    +		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
    +		if(frameworkID.isEmpty()) {
    +			LOG.info("Registering as new framework.");
    +		}
    +		else {
    +			LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
    +			frameworkInfo.setId(frameworkID.get());
    +		}
    +
    +		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
    +		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
    +		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
    +
    +		// create supporting actors
    +		connectionMonitor = createConnectionMonitor();
    +		launchCoordinator = createLaunchCoordinator();
    +		reconciliationCoordinator = createReconciliationCoordinator();
    +		taskRouter = createTaskRouter();
    +
    +		recoverWorkers();
    +
    +		connectionMonitor.tell(new ConnectionMonitor.Start(), self());
    +		schedulerDriver.start();
    +	}
    +
    +	protected ActorRef createConnectionMonitor() {
    +		return context().actorOf(
    +			ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
    +			"connectionMonitor");
    +	}
    +
    +	protected ActorRef createTaskRouter() {
    +		return context().actorOf(
    +			Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class),
    +			"tasks");
    +	}
    +
    +	protected ActorRef createLaunchCoordinator() {
    +		return context().actorOf(
    +			LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver,
createOptimizer()),
    +			"launchCoordinator");
    +	}
    +
    +	protected ActorRef createReconciliationCoordinator() {
    +		return context().actorOf(
    +			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config,
schedulerDriver),
    +			"reconciliationCoordinator");
    +	}
    +
    +	@Override
    +	public void postStop() {
    +		LOG.info("Stopping Mesos resource master");
    +		super.postStop();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Actor messages
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	protected void handleMessage(Object message) {
    +
    +		// check for Mesos-specific actor messages first
    +
    +		// --- messages about Mesos connection
    +		if (message instanceof Registered) {
    +			registered((Registered) message);
    +		} else if (message instanceof ReRegistered) {
    +			reregistered((ReRegistered) message);
    +		} else if (message instanceof Disconnected) {
    +			disconnected((Disconnected) message);
    +		} else if (message instanceof Error) {
    +			error(((Error) message).message());
    +
    +		// --- messages about offers
    +		} else if (message instanceof ResourceOffers || message instanceof OfferRescinded)
{
    +			launchCoordinator.tell(message, self());
    +		} else if (message instanceof AcceptOffers) {
    +			acceptOffers((AcceptOffers) message);
    +
    +		// --- messages about tasks
    +		} else if (message instanceof StatusUpdate) {
    +			taskStatusUpdated((StatusUpdate) message);
    +		} else if (message instanceof ReconciliationCoordinator.Reconcile) {
    +			// a reconciliation request from a task
    +			reconciliationCoordinator.tell(message, self());
    +		} else if (message instanceof TaskMonitor.TaskTerminated) {
    +			// a termination message from a task
    +			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
    +			taskTerminated(msg.taskID(), msg.status());
    +
    +		} else  {
    +			// message handled by the generic resource master code
    +			super.handleMessage(message);
    +		}
    +	}
    +
    +	/**
    +	 * Called to shut down the cluster (not a failover situation).
    +	 *
    +	 * @param finalStatus The application status to report.
    +	 * @param optionalDiagnostics An optional diagnostics message.
    +     */
    +	@Override
    +	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics)
{
    +
    +		LOG.info("Shutting down and unregistering as a Mesos framework.");
    +		try {
    +			// unregister the framework, which implicitly removes all tasks.
    +			schedulerDriver.stop(false);
    +		}
    +		catch(Exception ex) {
    +			LOG.warn("unable to unregister the framework", ex);
    +		}
    +
    +		try {
    +			workerStore.cleanup();
    +		}
    +		catch(Exception ex) {
    +			LOG.warn("unable to cleanup the ZooKeeper state", ex);
    +		}
    +
    +		context().stop(self());
    +	}
    +
    +	@Override
    +	protected void fatalError(String message, Throwable error) {
    +		// we do not unregister, but cause a hard fail of this process, to have it
    +		// restarted by the dispatcher
    +		LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error);
    +		LOG.error("Shutting down process");
    +
    +		// kill this process, this will make an external supervisor (the dispatcher) restart
the process
    +		System.exit(EXIT_CODE_FATAL_ERROR);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Worker Management
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Recover framework/worker information persisted by a prior incarnation of the RM.
    +	 */
    +	private void recoverWorkers() throws Exception {
    +		// if this application master starts as part of an ApplicationMaster/JobManager recovery,
    +		// then some worker tasks are most likely still alive and we can re-obtain them
    +		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
    +
    +		if (!tasksFromPreviousAttempts.isEmpty()) {
    +			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
    +
    +			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
    +			List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
    +
    +			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
    +				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
    +
    +				switch(worker.state()) {
    +					case New:
    +						workersInNew.put(extractResourceID(worker.taskID()), worker);
    +						toLaunch.add(launchable);
    +						break;
    +					case Launched:
    +						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
    +						break;
    +					case Released:
    +						workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +						break;
    +				}
    +				taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +			}
    +
    +			// tell the launch coordinator about prior assignments
    +			if(toAssign.size() >= 1) {
    +				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
    +			}
    +			// tell the launch coordinator to launch any new tasks
    +			if(toLaunch.size() >= 1) {
    +				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Plan for some additional workers to be launched.
    +	 *
    +	 * @param numWorkers The number of workers to allocate.
    +     */
    +	@Override
    +	protected void requestNewWorkers(int numWorkers) {
    +
    +		try {
    +			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers);
    +			List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers);
    +
    +			// generate new workers into persistent state and launch associated actors
    +			for (int i = 0; i < numWorkers; i++) {
    +				MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
    --- End diff --
    
    Is *worker* and *task* an equivalent term for the same thing? Maybe we could harmonize
that.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent ResourceManager
work.
> Design document:  ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message