spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-13316) "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards
Date Wed, 16 Mar 2016 04:50:33 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196758#comment-15196758
] 

Apache Spark commented on SPARK-13316:
--------------------------------------

User 'mwws' has created a pull request for this issue:
https://github.com/apache/spark/pull/11753

> "SparkException: DStream has not been initialized" when restoring StreamingContext from
checkpoint and the dstream is created afterwards
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13316
>                 URL: https://issues.apache.org/jira/browse/SPARK-13316
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Jacek Laskowski
>            Priority: Minor
>
> I faced the issue today but [it was already reported on SO|http://stackoverflow.com/q/35090180/1305344]
a couple of days ago and the reason is that a dstream is registered after a StreamingContext
has been recreated from checkpoint.
> It _appears_ that...no dstreams must be registered after a StreamingContext has been
recreated from checkpoint. It is *not* obvious at first.
> The code:
> {code}
> def createStreamingContext(): StreamingContext = {
>     val ssc = new StreamingContext(sparkConf, Duration(1000))
>     ssc.checkpoint(checkpointDir)
>     ssc
> }
> val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
> val socketStream = ssc.socketTextStream(...)
> socketStream.checkpoint(Seconds(1))
> socketStream.foreachRDD(...)
> {code}
> It should be described in docs at the very least and/or checked in the code when the
streaming computation starts.
> The exception is as follows:
> {code}
> org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab
has not been initialized
>   at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311)
>   at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
>   at scala.Option.orElse(Option.scala:289)
>   at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329)
>   at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228)
>   at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228)
>   at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97)
>   at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
>   at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
>   at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
>   at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
>   at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579)
>   ... 43 elided
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message