Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1CBA3200B67 for ; Tue, 16 Aug 2016 18:07:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1B146160AA8; Tue, 16 Aug 2016 16:07:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 11B5D160ABC for ; Tue, 16 Aug 2016 18:07:24 +0200 (CEST) Received: (qmail 63046 invoked by uid 500); 16 Aug 2016 16:07:24 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 62874 invoked by uid 99); 16 Aug 2016 16:07:24 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2016 16:07:24 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 2ECA82C02B6 for ; Tue, 16 Aug 2016 16:07:23 +0000 (UTC) Date: Tue, 16 Aug 2016 16:07:23 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 16 Aug 2016 16:07:26 -0000 [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422942#comment-15422942 ] ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74965913 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLaunch; + final Map workersBeingReturned; + + /** The number of failed tasks since the master became active */ + private int failedTasksSoFar; + + public MesosFlinkResourceManager( + Configuration flinkConfig, + MesosConfiguration mesosConfig, + MesosWorkerStore workerStore, + LeaderRetrievalService leaderRetrievalService, + MesosTaskManagerParameters taskManagerParameters, + Protos.TaskInfo.Builder taskManagerLaunchContext, + int maxFailedTasks, + int numInitialTaskManagers) { + + super(numInitialTaskManagers, flinkConfig, leaderRetrievalService); + + this.mesosConfig = requireNonNull(mesosConfig); + + this.workerStore = requireNonNull(workerStore); + + this.taskManagerParameters = requireNonNull(taskManagerParameters); + this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext); + this.maxFailedTasks = maxFailedTasks; + + this.workersInNew = new HashMap<>(); + this.workersInLaunch = new HashMap<>(); + this.workersBeingReturned = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + // Mesos-specific behavior + // ------------------------------------------------------------------------ + + @Override + protected void initialize() throws Exception { + LOG.info("Initializing Mesos resource master"); + + workerStore.start(); + + // create the scheduler driver to communicate with Mesos + schedulerCallbackHandler = new SchedulerProxy(self()); + + // register with Mesos + FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() + .clone() + .setCheckpoint(true); + + Option frameworkID = workerStore.getFrameworkID(); + if(frameworkID.isEmpty()) { + LOG.info("Registering as new framework."); + } + else { + LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue()); + frameworkInfo.setId(frameworkID.get()); + } + + MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo); + MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig); + schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(); + reconciliationCoordinator = createReconciliationCoordinator(); + taskRouter = createTaskRouter(); + + recoverWorkers(); + + connectionMonitor.tell(new ConnectionMonitor.Start(), self()); + schedulerDriver.start(); + } + + protected ActorRef createConnectionMonitor() { + return context().actorOf( + ConnectionMonitor.createActorProps(ConnectionMonitor.class, config), + "connectionMonitor"); + } + + protected ActorRef createTaskRouter() { + return context().actorOf( + Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class), + "tasks"); + } + + protected ActorRef createLaunchCoordinator() { + return context().actorOf( + LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver, createOptimizer()), + "launchCoordinator"); + } + + protected ActorRef createReconciliationCoordinator() { + return context().actorOf( + ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config, schedulerDriver), + "reconciliationCoordinator"); + } + + @Override + public void postStop() { + LOG.info("Stopping Mesos resource master"); + super.postStop(); + } + + // ------------------------------------------------------------------------ + // Actor messages + // ------------------------------------------------------------------------ + + @Override + protected void handleMessage(Object message) { + + // check for Mesos-specific actor messages first + + // --- messages about Mesos connection + if (message instanceof Registered) { + registered((Registered) message); + } else if (message instanceof ReRegistered) { + reregistered((ReRegistered) message); + } else if (message instanceof Disconnected) { + disconnected((Disconnected) message); + } else if (message instanceof Error) { + error(((Error) message).message()); + + // --- messages about offers + } else if (message instanceof ResourceOffers || message instanceof OfferRescinded) { + launchCoordinator.tell(message, self()); + } else if (message instanceof AcceptOffers) { + acceptOffers((AcceptOffers) message); + + // --- messages about tasks + } else if (message instanceof StatusUpdate) { + taskStatusUpdated((StatusUpdate) message); + } else if (message instanceof ReconciliationCoordinator.Reconcile) { + // a reconciliation request from a task + reconciliationCoordinator.tell(message, self()); + } else if (message instanceof TaskMonitor.TaskTerminated) { + // a termination message from a task + TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message; + taskTerminated(msg.taskID(), msg.status()); + + } else { + // message handled by the generic resource master code + super.handleMessage(message); + } + } + + /** + * Called to shut down the cluster (not a failover situation). + * + * @param finalStatus The application status to report. + * @param optionalDiagnostics An optional diagnostics message. + */ + @Override + protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + + LOG.info("Shutting down and unregistering as a Mesos framework."); + try { + // unregister the framework, which implicitly removes all tasks. + schedulerDriver.stop(false); + } + catch(Exception ex) { + LOG.warn("unable to unregister the framework", ex); + } + + try { + workerStore.cleanup(); + } + catch(Exception ex) { + LOG.warn("unable to cleanup the ZooKeeper state", ex); + } + + context().stop(self()); + } + + @Override + protected void fatalError(String message, Throwable error) { + // we do not unregister, but cause a hard fail of this process, to have it + // restarted by the dispatcher + LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error); + LOG.error("Shutting down process"); + + // kill this process, this will make an external supervisor (the dispatcher) restart the process + System.exit(EXIT_CODE_FATAL_ERROR); + } + + // ------------------------------------------------------------------------ + // Worker Management + // ------------------------------------------------------------------------ + + /** + * Recover framework/worker information persisted by a prior incarnation of the RM. + */ + private void recoverWorkers() throws Exception { + // if this application master starts as part of an ApplicationMaster/JobManager recovery, + // then some worker tasks are most likely still alive and we can re-obtain them + final List tasksFromPreviousAttempts = workerStore.recoverWorkers(); + + if (!tasksFromPreviousAttempts.isEmpty()) { + LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size()); + + List> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size()); + List toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size()); + + for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) { + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); + + switch(worker.state()) { + case New: + workersInNew.put(extractResourceID(worker.taskID()), worker); + toLaunch.add(launchable); + break; + case Launched: + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); + break; + case Released: + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + break; + } + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self()); + } + + // tell the launch coordinator about prior assignments + if(toAssign.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self()); + } + // tell the launch coordinator to launch any new tasks + if(toLaunch.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self()); + } + } + } + + /** + * Plan for some additional workers to be launched. + * + * @param numWorkers The number of workers to allocate. + */ + @Override + protected void requestNewWorkers(int numWorkers) { + + try { + List toMonitor = new ArrayList<>(numWorkers); + List toLaunch = new ArrayList<>(numWorkers); + + // generate new workers into persistent state and launch associated actors + for (int i = 0; i < numWorkers; i++) { + MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newTask(workerStore.newTaskID()); --- End diff -- Is *worker* and *task* an equivalent term for the same thing? Maybe we could harmonize that. > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent ResourceManager work. > Design document: ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)