flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3134][yarn] asynchronous YarnJobManager...
Date Tue, 15 Dec 2015 09:03:38 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1450#discussion_r47611363
  
    --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
    @@ -754,6 +538,243 @@ class YarnJobManager(
           memoryLimit
         }
       }
    +
    +  /**
    +    * Heartbeats with the resource manager and handles container updates.
    +    */
    +  object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
    +
    +    private var client : AMRMClientAsync[ContainerRequest] = null
    +
    +    override def onError(e: Throwable): Unit = {
    +      self ! decorateMessage(
    +        StopYarnSession(
    +          FinalApplicationStatus.FAILED,
    +          "Error in communication with Yarn resource manager: " + e.getMessage)
    +      )
    +    }
    +
    +    override def getProgress: Float = {
    +      runningContainers.toFloat / numTaskManagers
    +    }
    +
    +    override def onShutdownRequest(): Unit = {
    +    }
    +
    +    override def onNodesUpdated(updatedNodes: JavaList[NodeReport]): Unit = {
    +    }
    +
    +    override def onContainersCompleted(statuses: JavaList[ContainerStatus]): Unit = {
    +
    +      // TODO change this
    +      log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers,
" +
    +        s"failed containers $failedContainers, " +
    +        s"allocated containers ${allocatedContainersList.size}.")
    +
    +      val completedContainerStatuses = statuses.asScala
    +      val idStatusMap = completedContainerStatuses
    +        .map(status => (status.getContainerId, status)).toMap
    +
    +      completedContainerStatuses.foreach {
    +        status => log.info(s"Container ${status.getContainerId} is completed " +
    +          s"with diagnostics: ${status.getDiagnostics}")
    +      }
    +
    +      // get failed containers (returned containers are also completed, so we have to
    +      // distinguish if it was running before).
    +      val (completedContainers, remainingRunningContainers) = runningContainersList
    +        .partition(idStatusMap contains _.getId)
    +
    +      completedContainers.foreach {
    +        container =>
    +          val status = idStatusMap(container.getId)
    +          failedContainers += 1
    +          runningContainers -= 1
    +          log.info(s"Container ${status.getContainerId} was a running container. " +
    +            s"Total failed containers $failedContainers.")
    +          val detail = status.getExitStatus match {
    +            case -103 => "Vmem limit exceeded";
    +            case -104 => "Pmem limit exceeded";
    +            case _ => ""
    +          }
    +          messageListener foreach {
    +            _ ! decorateMessage(
    +              YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in "
+
    +                s"state=${status.getState}.\n${status.getDiagnostics} $detail")
    +            )
    +          }
    +      }
    +
    +      runningContainersList = remainingRunningContainers
    +
    +      // maxFailedContainers == -1 is infinite number of retries.
    +      if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers)
{
    +        val msg = s"Stopping YARN session because the number of failed " +
    +          s"containers ($failedContainers) exceeded the maximum failed container " +
    +          s"count ($maxFailedContainers). This number is controlled by " +
    +          s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " +
    +          s"setting. By default its the number of requested containers"
    +        log.error(msg)
    +        self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
    +
    +      }
    +
    +      allocateContainers()
    +
    +    }
    +
    +    override def onContainersAllocated(containers: JavaList[Container]): Unit = {
    +      println("onContainersAllocated")
    --- End diff --
    
    Looks like a leftover debug message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message