flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
Date Fri, 18 Mar 2016 05:05:33 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201019#comment-15201019
] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user Astralidea commented on a diff in the pull request:

    https://github.com/apache/flink/pull/948#discussion_r56613503
  
    --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/FlinkScheduler.scala
---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.scheduler
    +
    +import java.io.File
    +import java.util.{List => JList}
    +
    +import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
    +import org.apache.mesos.Protos.TaskState._
    +import org.apache.mesos.Protos._
    +import org.apache.mesos.{Scheduler, SchedulerDriver}
    +import org.slf4j.LoggerFactory
    +import scopt.OptionParser
    +
    +import scala.collection.JavaConversions._
    +
    +object FlinkScheduler extends Scheduler with SchedulerUtils {
    +
    +  val LOG = LoggerFactory.getLogger(FlinkScheduler.getClass)
    +  var jobManager: Option[Thread] = None
    +  var currentConfiguration: Option[Configuration] = None
    +  var taskManagers: Set[RunningTaskManager] = Set()
    +  var taskManagerCount = 0
    +  // http port where http server is hosting the configuration files
    +  var httpConfigServerAddress: Option[String] = None
    +
    +  override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = { }
    +
    +  override def disconnected(driver: SchedulerDriver): Unit = { }
    +
    +  override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit =
{ }
    +
    +  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {
    +    LOG.warn(s"Slave lost: ${slaveId.getValue}, removing all task managers matching slaveId")
    +    taskManagers = taskManagers.filter(_.slaveId != slaveId)
    +  }
    +
    +  override def error(driver: SchedulerDriver, message: String): Unit = { }
    +
    +  override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID,
    +                                slaveId: SlaveID, data: Array[Byte]): Unit = { }
    +
    +  override def registered(driver: SchedulerDriver, frameworkId: FrameworkID,
    +                          masterInfo: MasterInfo): Unit = { }
    +
    +  override def executorLost(driver: SchedulerDriver, executorId: ExecutorID,
    +                            slaveId: SlaveID, status: Int): Unit = {
    +    LOG.warn(s"Executor ${executorId.getValue} lost with status $status on slave $slaveId")
    +  }
    +
    +  override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
    +    val taskId = status.getTaskId.getValue
    +    val slaveId = status.getSlaveId.getValue
    +    LOG.info(
    +      s"statusUpdate received from taskId: $taskId slaveId: $slaveId [${status.getState.name()}]")
    +
    +    status.getState match {
    +      case TASK_FAILED | TASK_FINISHED | TASK_KILLED | TASK_LOST | TASK_ERROR =>
    +        LOG.info(s"Lost taskManager with TaskId: $taskId on slave: $slaveId")
    +        taskManagers = taskManagers.filter(_.taskId != status.getTaskId)
    +      case _ =>
    +        LOG.debug(s"No action to take for statusUpdate ${status.getState.name()}")
    +    }
    +  }
    +
    +  override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit =
{
    +    // we will combine all resources from te same slave and then launch a single task
rather
    +    // than one per offer this way we have better utilization and less memory wasted
on overhead.
    +    for((slaveId, offers) <- offers.groupBy(_.getSlaveId)) {
    +      val tasks = constructTaskInfoFromOffers(slaveId, offers.toList)
    +      driver.launchTasks(offers.map(_.getId), tasks)
    +    }
    +  }
    +
    +  def constructTaskInfoFromOffers(slaveId: SlaveID, offers: List[Offer]): Seq[TaskInfo]
= {
    +    val maxTaskManagers = GlobalConfiguration.getInteger(
    +      TASK_MANAGER_COUNT_KEY, DEFAULT_TASK_MANAGER_COUNT)
    +    val requiredMem = GlobalConfiguration.getFloat(
    +      TASK_MANAGER_MEM_KEY, DEFAULT_TASK_MANAGER_MEM)
    +    val requiredCPU = GlobalConfiguration.getFloat(
    +      TASK_MANAGER_CPU_KEY, DEFAULT_TASK_MANAGER_CPU.toFloat)
    +    val requiredDisk = GlobalConfiguration.getFloat(
    +      TASK_MANAGER_DISK_KEY, DEFAULT_TASK_MANGER_DISK)
    +    val attributeConstraints = parseConstraintString(GlobalConfiguration.getString(
    +      TASK_MANAGER_OFFER_ATTRIBUTES_KEY, DEFAULT_TASK_MANAGER_OFFER_ATTRIBUTES))
    +    val role = GlobalConfiguration.getString(
    +      MESOS_FRAMEWORK_ROLE_KEY, DEFAULT_MESOS_FRAMEWORK_ROLE)
    +    val uberJarLocation = GlobalConfiguration.getString(
    +      FLINK_UBERJAR_LOCATION_KEY, null)
    +    val nativeLibPath = GlobalConfiguration.getString(
    +      MESOS_NATIVE_JAVA_LIBRARY_KEY, DEFAULT_MESOS_NATIVE_JAVA_LIBRARY)
    +
    +    // combine offers into a single chunk
    +    val totalMemory = offers.flatMap(_.getResourcesList
    +      .filter(x => x.getName == "mem" && x.getRole == role)
    +      .map(_.getScalar.getValue)).sum
    +
    +    val totalCPU = offers.flatMap(_.getResourcesList
    +      .filter(x => x.getName == "cpus" && x.getRole == role)
    +      .map(_.getScalar.getValue)).sum
    +
    +    val totalDisk = offers.flatMap(_.getResourcesList
    +      .filter(x => x.getName == "disk" && x.getRole == role)
    +      .map(_.getScalar.getValue)).sum
    +
    +    val portRanges = offers.flatMap(_.getResourcesList
    +      .filter(x => x.getName == "ports" && x.getRole == role)
    +      .flatMap(_.getRanges.getRangeList))
    +
    +    val ports = getNPortsFromPortRanges(2, portRanges)
    +
    +    val offerAttributes = toAttributeMap(offers.flatMap(_.getAttributesList))
    +
    +    // check if all constraints are satisfield
    +    //  0. We need more task managers
    +    //  1. Attribute constraints
    +    //  2. Memory requirements
    +    //  3. CPU requirements
    +    //  4. Port requirements
    +    val meetsRequirements =
    +      taskManagers.size < maxTaskManagers &&
    +      totalCPU >= requiredCPU &&
    +      totalMemory >= requiredMem &&
    +      totalDisk >= requiredDisk &&
    +      ports.size == 2 &&
    +      matchesAttributeRequirements(attributeConstraints, offerAttributes)
    +
    +    LOG.info( if(meetsRequirements) "Accepting" else "Declining " +
    +      s"offer(s) from slave ${slaveId.getValue} " +
    +      s"offered [cpus: $totalCPU | mem : $totalMemory | disk: $totalDisk] " +
    +      s"required [cpus: $requiredCPU | mem: $requiredMem | disk: $requiredDisk]")
    +
    +    if (meetsRequirements) {
    +      // create task Id
    +      taskManagerCount += 1
    +
    +      // create executor
    +      val command = createTaskManagerCommand(requiredMem.toInt)
    +      val log4jUrl = s"${httpConfigServerAddress.get}/log4j.properties"
    +      val executorInfo = createExecutorInfo(s"$taskManagerCount", role,
    +        Set(uberJarLocation, log4jUrl), command, nativeLibPath)
    +
    +      // create task
    +      val taskId = TaskID.newBuilder().setValue(s"TaskManager_$taskManagerCount").build()
    +      val taskInfo = createTaskInfo(
    +        "taskManager", taskId, slaveId, role, requiredMem,
    +        requiredCPU, requiredDisk, ports, executorInfo, currentConfiguration.get)
    +
    +      Seq(taskInfo)
    +    } else {
    +      Seq()
    --- End diff --
    
    There has a bug, if you not call driver.declineOffer(offer.id) mesos will hold this offer
for FlinkScheduler, then other framework could not use this offer and there is no resource
any more.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: New Components
>            Reporter: Robert Metzger
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> There also is a pending pull request for adding Mesos support for Flink: https://github.com/apache/flink/pull/251
> But the PR is insufficiently tested. I'll add the code of the pull request to this JIRA
in case somebody wants to pick it up in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message