spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "weDataSphere (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-27614) Executor shuffle fetch hang
Date Wed, 01 May 2019 12:01:00 GMT
weDataSphere created SPARK-27614:
------------------------------------

             Summary: Executor shuffle fetch hang
                 Key: SPARK-27614
                 URL: https://issues.apache.org/jira/browse/SPARK-27614
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.0, 2.1.1
            Reporter: weDataSphere


Most of the Tasks have been completed, and individual Tasks have a particularly long Duration
and are not being processed at all

 

The corresponding Executor has a connection timeout, and the stack information shows hang
in the method of ShuffleBlockFetcherIterator.next.

 

The corresponding code is as follows:

      while (!isZombie && result == null) {

      val startFetchWait = System.nanoTime()

      result = results.take()

      val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)

      shuffleMetrics.incFetchWaitTime(fetchWaitTime)

        

 

LinkedBlockingQueue's take method is blocked. We can use poll instead. The modified code is
as follows:

  currentResult = if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable",
true)) results.take()

    else {

      logInfo("set spark.shuffle.fetch.timeout.enable=true.")

      val GB = 1L << 30

      val MB = 1L << 20

      val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)

      else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)

      else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)

      else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)

      else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)

      else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)

      else (10, TimeUnit.MINUTES)

      val r = results.poll(waitTime, unit)

      if(r == null) {

        val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle block,
give up!"

        logError(cost)

        throw new SparkException(cost)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message