spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From skonto <...@git.apache.org>
Subject [GitHub] spark pull request #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor po...
Date Mon, 04 Jul 2016 11:00:14 GMT
Github user skonto commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11157#discussion_r69441876
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
---
    @@ -356,4 +374,233 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
         sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
       }
     
    +  /**
    +   * Checks executor ports if they are within some range of the offered list of ports
ranges,
    +   *
    +   * @param sc the Spark Context
    +   * @param ports the list of ports to check
    +   * @return true if ports are within range false otherwise
    +   */
    +  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = {
    +
    +    def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
    +      ps.exists(r => r._1 <= port & r._2 >= port)
    +    }
    +
    +    val portsToCheck = ManagedPorts.getPortValues(sc.conf)
    +    val nonZeroPorts = portsToCheck.filter(_ != 0)
    +    val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
    +    // make sure we have enough ports to allocate per offer
    +    ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
    +  }
    +
    +  /**
    +   * Partitions port resources.
    +   *
    +   * @param conf the spark config
    +   * @param ports the ports offered
    +   * @return resources left, port resources to be used and the list of assigned ports
    +   */
    +  def partitionPorts(
    +      conf: SparkConf,
    +      ports: List[Resource])
    +    : (List[Resource], List[Resource], List[Long]) = {
    +    val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports")
    +    val portsToCheck = ManagedPorts.getPortValues(conf)
    +    val nonZeroPorts = portsToCheck.filter(_ != 0)
    +    // reserve non zero ports first
    +    val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
    +    // reserve actual port numbers for zero ports - not set by the user
    +    val numOfZeroPorts = portsToCheck.count(_ == 0)
    +    val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts)
    +    val zeroResources = reservePorts(nonZeroResources._1, randPorts)
    +    val (portResourcesLeft, portResourcesToBeUsed) =
    +      createResources(nonZeroResources, zeroResources)
    +    (portResourcesLeft, portResourcesToBeUsed, nonZeroPorts ++ randPorts)
    +  }
    +
    +  private object ManagedPorts {
    +    val portNames = List("spark.executor.port", "spark.blockManager.port")
    --- End diff --
    
    Btw i dont see a central point where i can retrieve reliably all ports opened by spark,
so i have to collect them somewhere and keep them updated (usually they dont change much).
For example spark.blockManager.port is found in NettyBlockTransferService, ideally a refactoring
is needed for such a design anyway. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message