flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Date Tue, 16 Aug 2016 16:06:41 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r74965913
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode>
{
    +
    +	/** The Mesos configuration (master and framework info) */
    +	private final MesosConfiguration mesosConfig;
    +
    +	/** The TaskManager container parameters (like container memory size) */
    +	private final MesosTaskManagerParameters taskManagerParameters;
    +
    +	/** Context information used to start a TaskManager Java process */
    +	private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +	/** Number of failed Mesos tasks before stopping the application. -1 means infinite.
*/
    +	private final int maxFailedTasks;
    +
    +	/** Callback handler for the asynchronous Mesos scheduler */
    +	private SchedulerProxy schedulerCallbackHandler;
    +
    +	/** Mesos scheduler driver */
    +	private SchedulerDriver schedulerDriver;
    +
    +	private ActorRef connectionMonitor;
    +
    +	private ActorRef taskRouter;
    +
    +	private ActorRef launchCoordinator;
    +
    +	private ActorRef reconciliationCoordinator;
    +
    +	private MesosWorkerStore workerStore;
    +
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +	/** The number of failed tasks since the master became active */
    +	private int failedTasksSoFar;
    +
    +	public MesosFlinkResourceManager(
    +		Configuration flinkConfig,
    +		MesosConfiguration mesosConfig,
    +		MesosWorkerStore workerStore,
    +		LeaderRetrievalService leaderRetrievalService,
    +		MesosTaskManagerParameters taskManagerParameters,
    +		Protos.TaskInfo.Builder taskManagerLaunchContext,
    +		int maxFailedTasks,
    +		int numInitialTaskManagers) {
    +
    +		super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
    +
    +		this.mesosConfig = requireNonNull(mesosConfig);
    +
    +		this.workerStore = requireNonNull(workerStore);
    +
    +		this.taskManagerParameters = requireNonNull(taskManagerParameters);
    +		this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
    +		this.maxFailedTasks = maxFailedTasks;
    +
    +		this.workersInNew = new HashMap<>();
    +		this.workersInLaunch = new HashMap<>();
    +		this.workersBeingReturned = new HashMap<>();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Mesos-specific behavior
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	protected void initialize() throws Exception {
    +		LOG.info("Initializing Mesos resource master");
    +
    +		workerStore.start();
    +
    +		// create the scheduler driver to communicate with Mesos
    +		schedulerCallbackHandler = new SchedulerProxy(self());
    +
    +		// register with Mesos
    +		FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
    +			.clone()
    +			.setCheckpoint(true);
    +
    +		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
    +		if(frameworkID.isEmpty()) {
    +			LOG.info("Registering as new framework.");
    +		}
    +		else {
    +			LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
    +			frameworkInfo.setId(frameworkID.get());
    +		}
    +
    +		MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
    +		MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
    +		schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
    +
    +		// create supporting actors
    +		connectionMonitor = createConnectionMonitor();
    +		launchCoordinator = createLaunchCoordinator();
    +		reconciliationCoordinator = createReconciliationCoordinator();
    +		taskRouter = createTaskRouter();
    +
    +		recoverWorkers();
    +
    +		connectionMonitor.tell(new ConnectionMonitor.Start(), self());
    +		schedulerDriver.start();
    +	}
    +
    +	protected ActorRef createConnectionMonitor() {
    +		return context().actorOf(
    +			ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
    +			"connectionMonitor");
    +	}
    +
    +	protected ActorRef createTaskRouter() {
    +		return context().actorOf(
    +			Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class),
    +			"tasks");
    +	}
    +
    +	protected ActorRef createLaunchCoordinator() {
    +		return context().actorOf(
    +			LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver,
createOptimizer()),
    +			"launchCoordinator");
    +	}
    +
    +	protected ActorRef createReconciliationCoordinator() {
    +		return context().actorOf(
    +			ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config,
schedulerDriver),
    +			"reconciliationCoordinator");
    +	}
    +
    +	@Override
    +	public void postStop() {
    +		LOG.info("Stopping Mesos resource master");
    +		super.postStop();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Actor messages
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	protected void handleMessage(Object message) {
    +
    +		// check for Mesos-specific actor messages first
    +
    +		// --- messages about Mesos connection
    +		if (message instanceof Registered) {
    +			registered((Registered) message);
    +		} else if (message instanceof ReRegistered) {
    +			reregistered((ReRegistered) message);
    +		} else if (message instanceof Disconnected) {
    +			disconnected((Disconnected) message);
    +		} else if (message instanceof Error) {
    +			error(((Error) message).message());
    +
    +		// --- messages about offers
    +		} else if (message instanceof ResourceOffers || message instanceof OfferRescinded)
{
    +			launchCoordinator.tell(message, self());
    +		} else if (message instanceof AcceptOffers) {
    +			acceptOffers((AcceptOffers) message);
    +
    +		// --- messages about tasks
    +		} else if (message instanceof StatusUpdate) {
    +			taskStatusUpdated((StatusUpdate) message);
    +		} else if (message instanceof ReconciliationCoordinator.Reconcile) {
    +			// a reconciliation request from a task
    +			reconciliationCoordinator.tell(message, self());
    +		} else if (message instanceof TaskMonitor.TaskTerminated) {
    +			// a termination message from a task
    +			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
    +			taskTerminated(msg.taskID(), msg.status());
    +
    +		} else  {
    +			// message handled by the generic resource master code
    +			super.handleMessage(message);
    +		}
    +	}
    +
    +	/**
    +	 * Called to shut down the cluster (not a failover situation).
    +	 *
    +	 * @param finalStatus The application status to report.
    +	 * @param optionalDiagnostics An optional diagnostics message.
    +     */
    +	@Override
    +	protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics)
{
    +
    +		LOG.info("Shutting down and unregistering as a Mesos framework.");
    +		try {
    +			// unregister the framework, which implicitly removes all tasks.
    +			schedulerDriver.stop(false);
    +		}
    +		catch(Exception ex) {
    +			LOG.warn("unable to unregister the framework", ex);
    +		}
    +
    +		try {
    +			workerStore.cleanup();
    +		}
    +		catch(Exception ex) {
    +			LOG.warn("unable to cleanup the ZooKeeper state", ex);
    +		}
    +
    +		context().stop(self());
    +	}
    +
    +	@Override
    +	protected void fatalError(String message, Throwable error) {
    +		// we do not unregister, but cause a hard fail of this process, to have it
    +		// restarted by the dispatcher
    +		LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error);
    +		LOG.error("Shutting down process");
    +
    +		// kill this process, this will make an external supervisor (the dispatcher) restart
the process
    +		System.exit(EXIT_CODE_FATAL_ERROR);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Worker Management
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Recover framework/worker information persisted by a prior incarnation of the RM.
    +	 */
    +	private void recoverWorkers() throws Exception {
    +		// if this application master starts as part of an ApplicationMaster/JobManager recovery,
    +		// then some worker tasks are most likely still alive and we can re-obtain them
    +		final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
    +
    +		if (!tasksFromPreviousAttempts.isEmpty()) {
    +			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
    +
    +			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
    +			List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
    +
    +			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
    +				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
    +
    +				switch(worker.state()) {
    +					case New:
    +						workersInNew.put(extractResourceID(worker.taskID()), worker);
    +						toLaunch.add(launchable);
    +						break;
    +					case Launched:
    +						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
    +						break;
    +					case Released:
    +						workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +						break;
    +				}
    +				taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +			}
    +
    +			// tell the launch coordinator about prior assignments
    +			if(toAssign.size() >= 1) {
    +				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
    +			}
    +			// tell the launch coordinator to launch any new tasks
    +			if(toLaunch.size() >= 1) {
    +				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Plan for some additional workers to be launched.
    +	 *
    +	 * @param numWorkers The number of workers to allocate.
    +     */
    +	@Override
    +	protected void requestNewWorkers(int numWorkers) {
    +
    +		try {
    +			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers);
    +			List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers);
    +
    +			// generate new workers into persistent state and launch associated actors
    +			for (int i = 0; i < numWorkers; i++) {
    +				MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
    --- End diff --
    
    Is *worker* and *task* an equivalent term for the same thing? Maybe we could harmonize
that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message