spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jon Buffington (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
Date Mon, 21 Sep 2015 23:29:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901609#comment-14901609
] 

Jon Buffington commented on SPARK-5569:
---------------------------------------

Cody,

We were able to work around the issue by destructuring the OffsetRange type into its parts
(i.e., string, long, ints). Below is our flow for reference. Our only concern is whether the
transform operation runs at the driver as we have use a singleton on the driver to persist
the last captured offsets. Can you confirm that the stream transform runs at the driver?

{noformat}
// Recover or create the stream.
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
  createStreamingContext(checkpointPath)
})
...
def createStreamingContext(checkpointPath: String): StreamingContext = {
  val ssc = new StreamingContext(SparkConf,  Minutes(1))
  ssc.checkpoint(checkpointPath)
  createStream(ssc)
  ssc
}
...

def createStream((ssc: StreamingContext): Unit = {
...
  KafkaUtils.createDirectStream[K, V, KD, VD, R](ssc, kafkaParams, fromOffsets, messageHandler)
    // "Note that the typecast to HasOffsetRanges will only succeed if it is
    // done in the first method called on the directKafkaStream, not later down
    // a chain of methods. You can use transform() instead of foreachRDD() as
    // your first method call in order to access offsets, then call further
    // Spark methods. However, be aware that the one-to-one mapping between RDD
    // partition and Kafka partition does not remain after any methods that
    // shuffle or repartition, e.g. reduceByKey() or window()."
    .transform { rdd =>
      ...
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      Ranger.captureOffsetRanges(offsetRanges) // De-structure the OffsetRange type into its
parts and save in a singleton.
      ...
      rdd
    }
    ...
}
{noformat}


> Checkpoints cannot reference classes defined outside of Spark's assembly
> ------------------------------------------------------------------------
>
>                 Key: SPARK-5569
>                 URL: https://issues.apache.org/jira/browse/SPARK-5569
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to create a JIRA
to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
> java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
>         at org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
>         at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
>         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239)
>         at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)
>         at example.CheckpointedExample$.main(CheckpointedExample.scala:34)
>         at example.CheckpointedExample.main(CheckpointedExample.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:274)
>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>         at org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
>         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
>         at scala.collection.mutable.HashMap.init(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         ... 52 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message