spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jan <...@insidin.com>
Subject Spark-1.6.0-preview2 trackStateByKey exception restoring state
Date Mon, 23 Nov 2015 15:22:18 GMT
Hi guys,

I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2
release and I'm encountering an exception when trying to restore previously
checkpointed state in spark streaming.

Use case:
- execute a stateful Spark streaming job using trackStateByKey
- interrupt / kill the job
- start the job again (without any code changes or cleaning out the
checkpoint directory)

Upon this restart, I encounter the exception below. The nature of the
exception makes me think either I am doing something wrong, or there's a
problem with this use case for the new trackStateByKey API.

I uploaded my job code (
https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's basically
just a modified version of the spark streaming example
StatefulNetworkWordCount (that had already been updated to use
trackStateByKey). My version however includes the usage of
StreamingContext.getOrCreate to actually read the checkpointed state when
the job is started, leading to the exception below.

Just to make sure: using StreamingContext.getOrCreate should still be
compatible with using the trackStateByKey API?

Thanx,
Jan

15/11/23 10:55:07 ERROR StreamingContext: Error starting the context,
marking it as stopped

java.lang.IllegalArgumentException: requirement failed

at scala.Predef$.require(Predef.scala:221)

at
org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)

at scala.Option.map(Option.scala:145)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)

at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)

at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)

at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609)

at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)

at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)

at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()

at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605)

at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599)

at
org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48)

at
org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-preview2-trackStateByKey-exception-restoring-state-tp15318.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Mime
View raw message