spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "zhoukang (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-21539) Job should not be aborted when dynamic allocation is enabled or spark.executor.instances larger then current allocated number by yarn
Date Wed, 26 Jul 2017 13:01:14 GMT
zhoukang created SPARK-21539:
--------------------------------

             Summary: Job should not be aborted when dynamic allocation is enabled or spark.executor.instances
larger then current allocated number by yarn
                 Key: SPARK-21539
                 URL: https://issues.apache.org/jira/browse/SPARK-21539
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 2.2.0, 2.1.0, 1.6.1
            Reporter: zhoukang


For spark on yarn.
Right now, when TaskSet can not run on any node or host.Which means blacklistedEverywhere
is true in TaskSetManager#abortIfCompleteBlacklisted.
However, if dynamic allocation is enabled, we should wait for yarn to allocate new nodemanager
in order to execute job successfully.And we should report this information to yarn in case
of assign same node which blacklisted by TaskScheduler.
How to reproduce?
1、Set up a yarn cluster with  5 nodes.And assign a node1 with much larger cpu core and memory,which
can let yarn launch container on this node even it is blacklisted by TaskScheduler.
2、modify BlockManager#registerWithExternalShuffleServer
{code:java}
logInfo("Registering executor with local external shuffle service.")
    val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirs.map(_.toString),
      diskBlockManager.subDirsPerLocalDir,
      shuffleManager.getClass.getName)

    val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
    val SLEEP_TIME_SECS = 5

    for (i <- 1 to MAX_ATTEMPTS) {
      try {
        {color:red}if (shuffleId.host.equals("node1's address")) {
             throw new Exception
        }{color}
        // Synchronous and will throw an exception if we cannot connect.
        shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
          shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
        return
      } catch {
        case e: Exception if i < MAX_ATTEMPTS =>
          logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS
- i}"
            + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
          Thread.sleep(SLEEP_TIME_SECS * 1000)
        case NonFatal(e) =>
          throw new SparkException("Unable to register with external shuffle server due to
: " +
            e.getMessage, e)
      }
    }
{code}
add logic in red.
3、set shuffle service enable as true and open shuffle service for yarn.
Then yarn will always launch executor on node1 but failed since shuffle service can not register
success.
Then job will be aborted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message