spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nadav Samet (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-17766) Write ahead log corruption on a toy project
Date Mon, 03 Oct 2016 17:42:20 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15542956#comment-15542956
] 

Nadav Samet commented on SPARK-17766:
-------------------------------------

I don't believe there's anything environmental going on. In this standalone setup, I just
run "sbt run" to have a local in-process standalone cluster.  It's just default vanilla spark
(but it fails in the same way when I use spark-submit to a local standalone cluster)

> Write ahead log corruption on a toy project
> -------------------------------------------
>
>                 Key: SPARK-17766
>                 URL: https://issues.apache.org/jira/browse/SPARK-17766
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 2.0.0
>            Reporter: Nadav Samet
>
> Write ahead log seems to get corrupted when the application is stopped abruptly (Ctrl-C,
or kill). Then, the application refuses to run due to this exception:
> {code}
> 2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] executor.Executor: Exception
in task 0.0 in stage 1.0 (TID 1)
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
> ...skipping...
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>         at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>         at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>         at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Code:
> {code}
> import org.apache.hadoop.conf.Configuration
> import org.apache.spark._
> import org.apache.spark.streaming._
> object ProtoDemo {
>   def createContext(dirName: String) = {
>     val conf = new SparkConf().setAppName("mything").setMaster("local[4]")
>     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>     /*
>     conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
>     conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
>     */
>     val ssc = new StreamingContext(conf, Seconds(1))
>     ssc.checkpoint(dirName)
>     val lines = ssc.socketTextStream("127.0.0.1", 9999)
>     val words = lines.flatMap(_.split(" "))
>     val pairs = words.map(word => (word, 1))
>     val wordCounts = pairs.reduceByKey(_ + _)
>     val runningCounts = wordCounts.updateStateByKey[Int] {
>       (values: Seq[Int], oldValue: Option[Int]) =>
>         val s = values.sum
>         Some(oldValue.fold(s)(_ + s))
>       }
>   // Print the first ten elements of each RDD generated in this DStream to the console
>     runningCounts.print()
>     ssc
>   }
>   def main(args: Array[String]) = {
>     val hadoopConf = new Configuration()
>     val dirName = "/tmp/chkp"
>     val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName),
hadoopConf)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> Steps to reproduce:
> 1. I put the code in a repository: git clone https://github.com/thesamet/spark-issue
> 2. in one terminal: {{ while true; do nc -l localhost 9999; done}}
> 3. Start a new terminal
> 4. Run "sbt run".
> 5. Type a few lines in the netcat terminal.
> 6. Kill the streaming project (Ctrl-C), 
> 7. Go back to step 4 until you see the exception above.
> I tried the above with local filesystem and also with S3, and getting the same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message