spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Surendranauth Hiraman <suren.hira...@velos.io>
Subject Re: Java IO Stream Corrupted - Invalid Type AC?
Date Wed, 18 Jun 2014 12:49:56 GMT
Patrick,

My team is using shuffle consolidation but not speculation. We are also
using persist(DISK_ONLY) for caching.

Here are some config changes that are in our work-in-progress.

We've been trying for 2 weeks to get our production flow (maybe around
50-70 stages, a few forks and joins with up to 20 branches in the forks) to
run end to end without any success, running into other problems besides
this one as well. For example, we have run into situations where saving to
HDFS just hangs on a couple of tasks, which are printing out nothing in
their logs and not taking any CPU. For testing, our input data is 10 GB
across 320 input splits and generates maybe around 200-300 GB of
intermediate and final data.


        conf.set("spark.executor.memory", "14g")     // TODO make this
configurable

        // shuffle configs
        conf.set("spark.default.parallelism", "320") // TODO make this
configurable
        conf.set("spark.shuffle.consolidateFiles","true")

        conf.set("spark.shuffle.file.buffer.kb", "200")
        conf.set("spark.reducer.maxMbInFlight", "96")

        conf.set("spark.rdd.compress","true"

        // we ran into a problem with the default timeout of 60 seconds
        // this is also being set in the master's spark-env.sh. Not sure if
it needs to be in both places
        conf.set("spark.worker.timeout","180")

        // akka settings
        conf.set("spark.akka.threads", "300")
        conf.set("spark.akka.timeout", "180")
        conf.set("spark.akka.frameSize", "100")
        conf.set("spark.akka.batchSize", "30")
        conf.set("spark.akka.askTimeout", "30")

        // block manager
        conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
        conf.set("spark.blockManagerHeartBeatMs", "80000")

-Suren



On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pwendell@gmail.com> wrote:

> Out of curiosity - are you guys using speculation, shuffle
> consolidation, or any other non-default option? If so that would help
> narrow down what's causing this corruption.
>
> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
> <suren.hiraman@velos.io> wrote:
> > Matt/Ryan,
> >
> > Did you make any headway on this? My team is running into this also.
> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> > generate 100s of GBs in the flow itself.
> >
> > -Suren
> >
> >
> >
> >
> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <compton.ryan@gmail.com>
> wrote:
> >
> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> >> cluster (no modifications to Spark or its dependencies). The error
> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> >> edge list (GraphX worked beautifully on smaller data).
> >>
> >> Here's the stacktrace (it's quite similar to yours
> >> https://imgur.com/7iBA4nJ ).
> >>
> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
> >> 4 times; aborting job
> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
> >> VertexRDD.scala:100
> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> >> recent failure: Exception failure in TID 29735 on host node18:
> >> java.io.StreamCorruptedException: invalid type code: AC
> >>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>
> >>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> >>
> >>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> >>
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>
> >>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> >>
> >>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> >>
> >>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>
> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>         java.lang.Thread.run(Thread.java:662)
> >> Driver stacktrace:
> >> at org.apache.spark.scheduler.DAGScheduler.org
> >>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >> at scala.Option.foreach(Option.scala:236)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> >> at
> >>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
> >>
> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <sowen@cloudera.com> wrote:
> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mkielo@oculusinfo.com>
> >> wrote:
> >> >> Im trying run some spark code on a cluster but I keep running into
a
> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
> task
> >> >> involves analyzing ~50GB of data (some operations involve sorting)
> then
> >> >> writing them out to a JSON file. Im running the analysis on each of
> the
> >> >> data's ~10 columns and have never had a successful run. My program
> >> seems to
> >> >> run for a varying amount of time each time (~between 5-30 minutes)
> but
> >> it
> >> >> always terminates with this error.
> >> >
> >> > I can tell you that this usually means somewhere something wrote
> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
> >> > is a header value.
> >> >
> >> > I don't obviously see where/how that could happen, but maybe it rings
> >> > a bell for someone. This could happen if an OutputStream is reused
> >> > across object serializations but new ObjectOutputStreams are opened,
> >> > for example.
> >>
> >
> >
> >
> > --
> >
> > SUREN HIRAMAN, VP TECHNOLOGY
> > Velos
> > Accelerating Machine Learning
> >
> > 440 NINTH AVENUE, 11TH FLOOR
> > NEW YORK, NY 10001
> > O: (917) 525-2466 ext. 105
> > F: 646.349.4063
> > E: suren.hiraman@v <suren.hiraman@sociocast.com>elos.io
> > W: www.velos.io
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <suren.hiraman@sociocast.com>elos.io
W: www.velos.io

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message