spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewor14 <...@git.apache.org>
Subject [GitHub] spark pull request: SPARK-4447. Remove layers of abstraction in YA...
Date Thu, 18 Dec 2014 19:54:26 GMT
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3652#discussion_r22066685
  
    --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -498,26 +494,160 @@ private[yarn] abstract class YarnAllocator(
        *
        * @param count Number of containers to allocate.
        *              If zero, should still contact RM (as a heartbeat).
    -   * @param pending Number of containers pending allocate. Only used on alpha.
        * @return Response to the allocation request.
        */
    -  protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
    +  private def allocateContainers(count: Int): AllocateResponse = {
    +    addResourceRequests(count)
     
    -  /** Called to release a previously allocated container. */
    -  protected def releaseContainer(container: Container): Unit
    +    // We have already set the container request. Poll the ResourceManager for a response.
    +    // This doubles as a heartbeat if there are no pending container requests.
    +    val progressIndicator = 0.1f
    +    amClient.allocate(progressIndicator)
    +  }
     
    -  /**
    -   * Defines the interface for an allocate response from the RM. This is needed since
the alpha
    -   * and stable interfaces differ here in ways that cannot be fixed using other routes.
    -   */
    -  protected trait YarnAllocateResponse {
    +  private def createRackResourceRequests(
    +      hostContainers: ArrayBuffer[ContainerRequest])
    +    : ArrayBuffer[ContainerRequest] = {
    +    // Generate modified racks and new set of hosts under it before issuing requests.
    +    val rackToCounts = new HashMap[String, Int]()
     
    -    def getAllocatedContainers(): JList[Container]
    +    for (container <- hostContainers) {
    +      val candidateHost = container.getNodes.last
    +      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
    +
    +      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    +      if (rack != null) {
    +        var count = rackToCounts.getOrElse(rack, 0)
    +        count += 1
    +        rackToCounts.put(rack, count)
    +      }
    +    }
     
    -    def getAvailableResources(): Resource
    +    val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
    +    for ((rack, count) <- rackToCounts) {
    +      requestedContainers ++= createResourceRequests(
    +        AllocationType.RACK,
    +        rack,
    +        count,
    +        RM_REQUEST_PRIORITY)
    +    }
     
    -    def getCompletedContainersStatuses(): JList[ContainerStatus]
    +    requestedContainers
    +  }
     
    +  private def addResourceRequests(numExecutors: Int): Unit = {
    +    val containerRequests: List[ContainerRequest] =
    +      if (numExecutors <= 0) {
    +        logDebug("numExecutors: " + numExecutors)
    +        List()
    +      } else if (preferredHostToCount.isEmpty) {
    +        logDebug("host preferences is empty")
    +        createResourceRequests(
    +          AllocationType.ANY,
    +          resource = null,
    +          numExecutors,
    +          RM_REQUEST_PRIORITY).toList
    +      } else {
    +        // Request for all hosts in preferred nodes and for numExecutors -
    +        // candidates.size, request by default allocation policy.
    +        val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
    +        for ((candidateHost, candidateCount) <- preferredHostToCount) {
    +          val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
    +
    +          if (requiredCount > 0) {
    +            hostContainerRequests ++= createResourceRequests(
    +              AllocationType.HOST,
    +              candidateHost,
    +              requiredCount,
    +              RM_REQUEST_PRIORITY)
    +          }
    +        }
    +        val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
    +          hostContainerRequests).toList
    +
    +        val anyContainerRequests = createResourceRequests(
    +          AllocationType.ANY,
    +          resource = null,
    +          numExecutors,
    +          RM_REQUEST_PRIORITY)
    +
    +        val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
    +          hostContainerRequests.size + rackContainerRequests.size + anyContainerRequests.size)
    +
    +        containerRequestBuffer ++= hostContainerRequests
    +        containerRequestBuffer ++= rackContainerRequests
    +        containerRequestBuffer ++= anyContainerRequests
    +        containerRequestBuffer.toList
    +      }
    +
    +    for (request <- containerRequests) {
    +      amClient.addContainerRequest(request)
    +    }
    +
    +    for (request <- containerRequests) {
    +      val nodes = request.getNodes
    +      val hostStr = if (nodes == null || nodes.isEmpty) {
    +        "Any"
    +      } else {
    +        nodes.last
    +      }
    +      logInfo("Container request (host: %s, priority: %s, capability: %s".format(
    +        hostStr,
    +        request.getPriority().getPriority,
    +        request.getCapability))
    +    }
    +  }
    +
    +  private def createResourceRequests(
    +      requestType: AllocationType.AllocationType,
    +      resource: String,
    +      numExecutors: Int,
    +      priority: Int)
    +    : ArrayBuffer[ContainerRequest] = {
    --- End diff --
    
    I would bump this up the previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message