spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (SPARK-7174) Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool
Date Mon, 27 Apr 2015 17:45:40 GMT

     [ https://issues.apache.org/jira/browse/SPARK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Apache Spark reassigned SPARK-7174:
-----------------------------------

    Assignee: Apache Spark

> Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking
the Akka thread pool
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7174
>                 URL: https://issues.apache.org/jira/browse/SPARK-7174
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.2, 1.3.1
>            Reporter: Shixiong Zhu
>            Assignee: Apache Spark
>
> HeartbeatReceiver will TaskScheduler.executorHeartbeatReceived, which is a blocking operation
because "TaskScheduler.executorHeartbeatReceived" will call 
> {code}
>     blockManagerMaster.driverEndpoint.askWithReply[Boolean](
>       BlockManagerHeartbeat(blockManagerId), 600 seconds)
> {code}
> finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g.,
the reply may be dispatched to the same thread of the ask operation. So the reply cannot be
processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.
> jstack log:
> {code}
> "sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000
nid=0x725 waiting on condition [0x00007f2b1d6d0000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
> 	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
> 	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
> 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> 	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> 	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> 	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
> 	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
> 	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
> 	at scala.concurrent.Await$.result(package.scala:107)
> 	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
> 	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
> 	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
> 	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
> 	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message