spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mgummelt <...@git.apache.org>
Subject [GitHub] spark pull request #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor po...
Date Tue, 09 Aug 2016 22:18:34 GMT
Github user mgummelt commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11157#discussion_r74155413
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
---
    @@ -358,6 +376,119 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
       }
     
       /**
    +   * Checks executor ports if they are within some range of the offered list of ports
ranges,
    +   *
    +   * @param conf the Spark Config
    +   * @param ports the list of ports to check
    +   * @return true if ports are within range false otherwise
    +   */
    +  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = {
    +
    +    def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
    +      ps.exists(r => r._1 <= port & r._2 >= port)
    +    }
    +
    +    val portsToCheck = nonZeroPortValuesFromConfig(conf)
    +    val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
    +    // make sure we have enough ports to allocate per offer
    +    ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
    +  }
    +
    +  /**
    +   * Partitions port resources.
    +   *
    +   * @param requestedPorts non-zero ports to assign
    +   * @param offeredResources the resources offered
    +   * @return resources left, port resources to be used.
    +   */
    +  def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource])
    +    : (List[Resource], List[Resource]) = {
    +    if (requestedPorts.isEmpty) {
    +     return (offeredResources, List[Resource]())
    +    }
    +    // partition port offers
    +    val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
    +
    +    val offeredPortRanges = getRangeResourceWithRoleInfo(portResources.asJava, "ports")
    +
    +    val portsAndRoles = requestedPorts.
    +      map(x => (x, findPortAndGetAssignedRangeRole(x, offeredPortRanges)))
    +
    +    val assignedPortResources = createResourcesFromPorts(portsAndRoles)
    +
    +    // ignore non-assigned port resources, they will be declined implicitly by mesos
    +    // no need for splitting port resources.
    +    (resourcesWithoutPorts, assignedPortResources)
    +  }
    +
    +  val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
    +
    +  /**
    +   * The values of the non-zero ports to be used by the executor process.
    +   * @param conf the spark config to use
    +   * @return the ono-zero values of the ports
    +   */
    +  def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
    +    managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
    +  }
    +
    +  /** Creates a mesos resource for a specific port number. */
    +  private def createResourcesFromPorts(nonZero: List[(Long, String)]) : List[Resource]
= {
    +    nonZero.flatMap{port => createMesosPortResource(List((port._1, port._1)), Some(port._2))}
    +  }
    +
    +  /**
    +   * A resource can have multiple values in the offer since it can either be from
    +   * a specific role or wildcard.
    +   * Extract role info and port range for every port resource in the offer.
    +   */
    +  private def getRangeResourceWithRoleInfo(resources: JList[Resource], name: String)
    +    : List[(String, List[(Long, Long)])] = {
    +    resources.asScala.filter(_.getName == name).
    +      map{resource =>
    +        (resource.getRole, resource.getRanges.getRangeList.asScala
    +        .map(r => (r.getBegin, r.getEnd)).toList)
    +      }.toList
    +  }
    +
    +  /** Helper to create mesos resources for specific port ranges. */
    +  private def createMesosPortResource(
    +      ranges: List[(Long, Long)],
    +      role: Option[String] = None): List[Resource] = {
    +    ranges.map { range =>
    +      val rangeValue = Value.Range.newBuilder()
    +        .setBegin(range._1)
    +        .setEnd(range._2)
    +      val builder = Resource.newBuilder()
    +        .setName("ports")
    +        .setType(Value.Type.RANGES)
    +        .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
    +      role.foreach { r => builder.setRole(r) }
    +      builder.build()
    +    }
    +  }
    +
    + /**
    +  * Helper to assign a port to an offered range and get the latter's role
    +  * info to use it later on.
    +  */
    +  private def findPortAndGetAssignedRangeRole(
    +       port: Long,
    +       ranges: List[(String, List[(Long, Long)])])
    +    : String = {
    +    val rangePortRole = ranges
    +      .map{p => val tmpList = List(p._2.filter(r => r._1 <= port & r._2
>= port))
    +         (p._1, tmpList.head)}.filterNot{p => p._2.isEmpty}
    +      .head._1
    --- End diff --
    
    `.head` is going to throw an exception when there are no ports in the offer.
    
    Can you also add a test for this scenario?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message