spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: [0.9.0] Possible deadlock in shutdown hook?
Date Fri, 07 Feb 2014 07:30:15 GMT
That's genius.  Of course when a worker is told to shutdown it should
interrupt its worker threads -- I think that would address this issue.

Are you thinking to put

running.map(_.jobId).foreach { handleJobCancellation }

at the top of the StopDAGScheduler block?


On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
<tathagata.das1565@gmail.com>wrote:

> Its highly likely that the executor with the threadpool that runs the tasks
> are the only set of threads that writes to disk. The tasks are designed to
> be interrupted when the corresponding job is cancelled. So a reasonably
> simple way could be to actually cancel the currently active jobs, which
> would send the signal to the worker to stop the tasks. Currently, the
> DAGScheduler<
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> >does
> not seem to actually cancel the jobs, only mark them as failed. So it
> may be a simple addition.
>
> There may be some complications with the external spilling of shuffle data
> to disk not stopping immediately when the task is marked for killing. Gotta
> try it out.
>
> TD
>
> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <andrew@andrewash.com> wrote:
>
> > There is probably just one threadpool that has task threads -- is it
> > possible to enumerate and interrupt just those?  We may need to keep
> string
> > a reference to that threadpool through to the shutdown thread to make
> that
> > happen.
> >
> >
> > On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mridul@gmail.com
> > >wrote:
> >
> > > Ideally, interrupting the thread writing to disk should be sufficient
> > > - though since we are in middle of shutdown when this is happening, it
> > > is best case effort anyway.
> > > Identifying which threads to interrupt will be interesting since most
> > > of them are driven by threadpool's and we cant list all threads and
> > > interrupt all of them !
> > >
> > >
> > > Regards,
> > > Mridul
> > >
> > >
> > > On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <andrew@andrewash.com>
> wrote:
> > > > I think the solution where we stop the writing threads and then let
> the
> > > > deleting threads completely clean up is the best option since the
> final
> > > > state doesn't have half-deleted temp dirs scattered across the
> cluster.
> > > >
> > > > How feasible do you think it'd be to interrupt the other threads?
> > > >
> > > >
> > > > On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> mridul@gmail.com
> > > >wrote:
> > > >
> > > >> Looks like a pathological corner case here - where the the delete
> > > >> thread is not getting run while the OS is busy prioritizing the
> thread
> > > >> writing data (probably with heavy gc too).
> > > >> Ideally, the delete thread would list files, remove them and then
> fail
> > > >> when it tries to remove the non empty directory (since other thread
> > > >> might be creating more in parallel).
> > > >>
> > > >>
> > > >> Regards,
> > > >> Mridul
> > > >>
> > > >>
> > > >> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <andrew@andrewash.com>
> > > wrote:
> > > >> > Got a repro locally on my MBP (the other was on a CentOS machine).
> > > >> >
> > > >> > Build spark, run a master and a worker with the sbin/start-all.sh
> > > script,
> > > >> > then run this in a shell:
> > > >> >
> > > >> > import org.apache.spark.storage.StorageLevel._
> > > >> > val s = sc.parallelize(1 to
> > 1000000000).persist(MEMORY_AND_DISK_SER);
> > > >> > s.count
> > > >> >
> > > >> > After about a minute, this line appears in the shell logging
> output:
> > > >> >
> > > >> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > BlockManager
> > > >> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> recent
> > > >> heart
> > > >> > beats: 57510ms exceeds 45000ms
> > > >> >
> > > >> > Ctrl-C the shell.  In jps there is now a worker, a master, and
a
> > > >> > CoarseGrainedExecutorBackend.
> > > >> >
> > > >> > Run jstack on the CGEBackend JVM, and I got the attached
> > stacktraces.
> > >  I
> > > >> > waited around for 15min then kill -9'd the JVM and restarted
the
> > > process.
> > > >> >
> > > >> > I wonder if what's happening here is that the threads that are
> > spewing
> > > >> data
> > > >> > to disk (as that parallelize and persist would do) can write
to
> disk
> > > >> faster
> > > >> > than the cleanup threads can delete from disk.
> > > >> >
> > > >> > What do you think of that theory?
> > > >> >
> > > >> >
> > > >> > Andrew
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> > mridul@gmail.com
> > > >
> > > >> > wrote:
> > > >> >>
> > > >> >> shutdown hooks should not take 15 mins are you mentioned
!
> > > >> >> On the other hand, how busy was your disk when this was
> happening ?
> > > >> >> (either due to spark or something else ?)
> > > >> >>
> > > >> >> It might just be that there was a lot of stuff to remove
?
> > > >> >>
> > > >> >> Regards,
> > > >> >> Mridul
> > > >> >>
> > > >> >>
> > > >> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <andrew@andrewash.com
> >
> > > >> wrote:
> > > >> >> > Hi Spark devs,
> > > >> >> >
> > > >> >> > Occasionally when hitting Ctrl-C in the scala spark
shell on
> > 0.9.0
> > > one
> > > >> >> > of
> > > >> >> > my workers goes dead in the spark master UI.  I'm using
the
> > > standalone
> > > >> >> > cluster and didn't ever see this while using 0.8.0 so
I think
> it
> > > may
> > > >> be
> > > >> >> > a
> > > >> >> > regression.
> > > >> >> >
> > > >> >> > When I prod on the hung CoarseGrainedExecutorBackend
JVM with
> > > jstack
> > > >> and
> > > >> >> > jmap -heap, it doesn't respond unless I add the -F force
flag.
> >  The
> > > >> heap
> > > >> >> > isn't full, but there are some interesting bits in the
jstack.
> > >  Poking
> > > >> >> > around a little, I think there may be some kind of deadlock
in
> > the
> > > >> >> > shutdown
> > > >> >> > hooks.
> > > >> >> >
> > > >> >> > Below are the threads I think are most interesting:
> > > >> >> >
> > > >> >> > Thread 14308: (state = BLOCKED)
> > > >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> > frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > > >> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
> > > >> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> @bci=25,
> > > >> >> > line=498
> > > >> >> > (Interpreted frame)
> > > >> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
@bci=39,
> > > >> line=456
> > > >> >> > (Interpreted frame)
> > > >> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> > > line=237
> > > >> >> > (Interpreted frame)
> > > >> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> > > frame)
> > > >> >> >  -
> > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > > >> >> > @bci=4, line=386 (Interpreted frame)
> > > >> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> > > line=260
> > > >> >> > (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > > >> >> > @bci=10, line=1339 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > > >> >> > @bci=11, line=1979 (Compiled frame)
> > > >> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> @bci=14,
> > > >> >> > line=107
> > > >> >> > (Interpreted frame)
> > > >> >> >
> > > >> >> > Thread 3865: (state = BLOCKED)
> > > >> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> > > >> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> frame)
> > > >> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> > line=106
> > > >> >> > (Interpreted frame)
> > > >> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0,
line=46
> > > >> >> > (Interpreted
> > > >> >> > frame)
> > > >> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> > > frame)
> > > >> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> line=52
> > > >> >> > (Interpreted frame)
> > > >> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted
frame)
> > > >> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted
frame)
> > > >> >> >
> > > >> >> >
> > > >> >> > Thread 3987: (state = BLOCKED)
> > > >> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0
> (Interpreted
> > > >> frame)
> > > >> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted
frame)
> > > >> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled
frame)
> > > >> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > > @bci=1,
> > > >> >> > line=466 (Interpreted frame)
> > > >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > >> @bci=9,
> > > >> >> > line=478 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > > >> >> > @bci=4, line=479 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > > >> >> > @bci=5, line=478 (Compiled frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> > > >> >> >  -
> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > > >> >> > @bci=2,
> > > >> >> > line=34 (Compiled frame)
> > > >> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > > >> @bci=19,
> > > >> >> > line=478 (Interpreted frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > > >> >> > @bci=14, line=141 (Interpreted frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > > >> >> > @bci=5, line=139 (Interpreted frame)
> > > >> >> >  -
> > > >> >> >
> > > >> >> >
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
> > > >> >> >  -
> > scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > > >> >> > @bci=2,
> > > >> >> > line=108 (Interpreted frame)
> > > >> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > @bci=39,
> > > >> >> > line=139 (Interpreted frame)
> > > >> >> >
> > > >> >> >
> > > >> >> > I think what happened here is that thread 14308 received
the
> akka
> > > >> >> > "shutdown" message and called System.exit().  This started
> thread
> > > >> 3865,
> > > >> >> > which is the JVM shutting itself down.  Part of that
process is
> > > >> running
> > > >> >> > the
> > > >> >> > shutdown hooks, so it started thread 3987.  That thread
is the
> > > >> shutdown
> > > >> >> > hook from addShutdownHook() in DiskBlockManager.scala,
which
> > looks
> > > >> like
> > > >> >> > this:
> > > >> >> >
> > > >> >> >   private def addShutdownHook() {
> > > >> >> >     localDirs.foreach(localDir =>
> > > >> >> > Utils.registerShutdownDeleteDir(localDir))
> > > >> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete
Spark
> > > local
> > > >> >> > dirs") {
> > > >> >> >       override def run() {
> > > >> >> >         logDebug("Shutdown hook called")
> > > >> >> >         localDirs.foreach { localDir =>
> > > >> >> >           try {
> > > >> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > > >> >> > Utils.deleteRecursively(localDir)
> > > >> >> >           } catch {
> > > >> >> >             case t: Throwable =>
> > > >> >> >               logError("Exception while deleting local
spark
> dir:
> > > " +
> > > >> >> > localDir, t)
> > > >> >> >           }
> > > >> >> >         }
> > > >> >> >
> > > >> >> >         if (shuffleSender != null) {
> > > >> >> >           shuffleSender.stop()
> > > >> >> >         }
> > > >> >> >       }
> > > >> >> >     })
> > > >> >> >   }
> > > >> >> >
> > > >> >> > It goes through and deletes the directories recursively.
 I was
> > > >> thinking
> > > >> >> > there might be some issues with concurrently-running
shutdown
> > hooks
> > > >> >> > deleting things out from underneath each other (shutdown
hook
> > > javadocs
> > > >> >> > say
> > > >> >> > they're all started in parallel if multiple hooks are
added)
> > > causing
> > > >> the
> > > >> >> > File.list() in that last thread to take quite some time.
> > > >> >> >
> > > >> >> > While I was looking through the stacktrace the JVM finally
> exited
> > > >> (after
> > > >> >> > 15-20min at least) so I won't be able to debug more
until this
> > bug
> > > >> >> > strikes
> > > >> >> > again.
> > > >> >> >
> > > >> >> > Any ideas on what might be going on here?
> > > >> >> >
> > > >> >> > Thanks!
> > > >> >> > Andrew
> > > >> >
> > > >> >
> > > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message