spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (SPARK-7237) Many user provided closures are not actually cleaned
Date Wed, 29 Apr 2015 19:59:07 GMT

     [ https://issues.apache.org/jira/browse/SPARK-7237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Apache Spark reassigned SPARK-7237:
-----------------------------------

    Assignee: Andrew Or  (was: Apache Spark)

> Many user provided closures are not actually cleaned
> ----------------------------------------------------
>
>                 Key: SPARK-7237
>                 URL: https://issues.apache.org/jira/browse/SPARK-7237
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: Andrew Or
>            Assignee: Andrew Or
>
> It appears that many operations throughout Spark actually do not actually clean the closures
provided by the user.
> Simple reproduction:
> {code}
> def test(): Unit = {
>   sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect()
> }
> {code}
> Clearly, the inner closure is not serializable, but when we serialize it we should expect
the ClosureCleaner to fail fast and complain loudly about return statements. Instead, we get
a mysterious stack trace:
> {code}
> java.io.NotSerializableException: java.lang.Object
> Serialization stack:
> 	- object not serializable (class: java.lang.Object, value: java.lang.Object@6db4b914)
> 	- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: nonLocalReturnKey1$1,
type: class java.lang.Object)
> 	- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
> 	- field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type: interface scala.Function1)
> 	- object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
> 	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314)
> {code}
> What might have caused this? If you look at the code for mapPartitions, you'll notice
that we never explicitly clean the closure passed in by the user. Instead, we only wrap it
in another closure and clean the outer one:
> {code}
> def mapPartitions[U: ClassTag](
>       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
= {
>     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
>     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
>   }
> {code}
> This is not sufficient, however, because the user provided closure is actually a field
of the outer closure, which doesn't get cleaned. If we rewrite the above by cleaning the inner
closure preemptively:
> {code}
> def mapPartitions[U: ClassTag](
>       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
= {
>     val cleanedFunc = clean(f)
>     new MapPartitionsRDD(
>       this,
>       (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedFunc(iter),
>       preservesPartitioning)
>   }
> {code}
> Then we get the exception that we would expect by running the test() example above:
> {code}
> org.apache.spark.SparkException: Return statements aren't allowed in Spark closures
> 	at org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357)
> 	at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source)
> 	at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:1759)
> 	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640)
> {code}
> This needs to be done in a few places throughout the Spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message