spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jack Hu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode
Date Fri, 15 May 2015 03:05:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jack Hu updated SPARK-7624:
---------------------------
    Description: 
I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives
json string from a socket with rate 50 events per second, it can run well in first 6 hours
(although the minor gc count per minute is increasing all the time), after that, i can see
that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after
10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%.
This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the
java memory after 16 hours and can see there are about 200000 {{org.apache.spark.scheduler.local.ReviveOffers}}
objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code
and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}:
the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
    val tasks = scheduler.resourceOffers(offers).flatten
    for (task <- tasks) {
      freeCores -= scheduler.CPUS_PER_TASK
      executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
        task.name, task.serializedTask)
    }

    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
      // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
      context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
    }
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is introduced
from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running,
the scheduler delay is about 10 ms all the time. So there should have some conditions that
the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that
this is the root cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
      hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
      /// parse the json to Order
      Order(_), preservePartitioning = true)
val mresult = input.map(
      v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache()
val tempr  = mresult.window(
            Seconds(firstStageWindowSize), 
            Seconds(firstStageWindowSize)
          ).transform(
            rdd => rdd.union(rdd).union(rdd).union(rdd)
          )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) => {
            for (i <- 1 to 5) {
              val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
              println("""T: """ + t + """: """ + c)
            }
          })
{code}

========================================================
Updated at 2015-05-15
I did print some detail schedule times of the suspect lines in {{LocalActor::reviveOffers}}:
{color:red}*1685343501*{color} times after 18 hours running.

  was:
I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives
json string from a socket with rate 50 events per second, it can run well in first 6 hours
(although the minor gc count per minute is increasing all the time), after that, i can see
that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after
10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%.
This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the
java memory after 16 hours and can see there are about 200000 {{org.apache.spark.scheduler.local.ReviveOffers}}
objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code
and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}:
the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
    val tasks = scheduler.resourceOffers(offers).flatten
    for (task <- tasks) {
      freeCores -= scheduler.CPUS_PER_TASK
      executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
        task.name, task.serializedTask)
    }

    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
      // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
      context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
    }
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is introduced
from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running,
the scheduler delay is about 10 ms all the time. So there should have some conditions that
the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that
this is the root cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
      hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
      /// parse the json to Order
      Order(_), preservePartitioning = true)
val mresult = input.map(
      v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache()
val tempr  = mresult.window(
            Seconds(firstStageWindowSize), 
            Seconds(firstStageWindowSize)
          ).transform(
            rdd => rdd.union(rdd).union(rdd).union(rdd)
          )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) => {
            for (i <- 1 to 5) {
              val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
              println("""T: """ + t + """: """ + c)
            }
          })
{code}

========================================================
Updated at 2015-05-15
I did print some detail schedule times of the suspect lines in {{LocalActor::reviveOffers}}:
{color:red}1685343501{color} times after 18 hours running.


> Task scheduler delay is increasing time over time in spark local mode
> ---------------------------------------------------------------------
>
>                 Key: SPARK-7624
>                 URL: https://issues.apache.org/jira/browse/SPARK-7624
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1
>            Reporter: Jack Hu
>              Labels: delay, schedule
>
> I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives
json string from a socket with rate 50 events per second, it can run well in first 6 hours
(although the minor gc count per minute is increasing all the time), after that, i can see
that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after
10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%.
This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the
java memory after 16 hours and can see there are about 200000 {{org.apache.spark.scheduler.local.ReviveOffers}}
objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code
and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}:
the {{LocalActor::reviveOffers}}
> {code}
>  def reviveOffers() {
>     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
>     val tasks = scheduler.resourceOffers(offers).flatten
>     for (task <- tasks) {
>       freeCores -= scheduler.CPUS_PER_TASK
>       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
>         task.name, task.serializedTask)
>     }
>     if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
>       // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
>       context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
>     }
> }
> {code}
> I removed the last three lines in this method (the whole {{if}} block, which is introduced
from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running,
the scheduler delay is about 10 ms all the time. So there should have some conditions that
the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that
this is the root cause of this issue. 
> My spark settings:
> #  Memor: 3G
> # CPU: 8 cores 
> # Streaming Batch interval: 5 seconds.  
> Here are my streaming code:
> {code}
> val input = ssc.socketTextStream(
>       hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
>       /// parse the json to Order
>       Order(_), preservePartitioning = true)
> val mresult = input.map(
>       v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache()
> val tempr  = mresult.window(
>             Seconds(firstStageWindowSize), 
>             Seconds(firstStageWindowSize)
>           ).transform(
>             rdd => rdd.union(rdd).union(rdd).union(rdd)
>           )
> tempr.count.print
> tempr.cache().foreachRDD((rdd, t) => {
>             for (i <- 1 to 5) {
>               val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
>               println("""T: """ + t + """: """ + c)
>             }
>           })
> {code}
> ========================================================
> Updated at 2015-05-15
> I did print some detail schedule times of the suspect lines in {{LocalActor::reviveOffers}}:
{color:red}*1685343501*{color} times after 18 hours running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message