spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gal Topper (JIRA)" <>
Subject [jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
Date Sun, 26 Mar 2017 19:49:41 GMT


Gal Topper commented on SPARK-19476:

The example in the description does indeed fully materialize the iterator to very simply reproduce
the issue. To be clear, the real code I'm running doesn't do that :-)! Instead, it pulls items
from the iterator on demand. The workaround I described basically makes sure that only the
original executor thread ever calls next() on the iterator, which is still done on demand,
not all-at-once.

In my own experience, using threads works perfectly fine with the exception of this issue,
and I've never read anything in the docs to discourage users from doing so. +1 for a note
though, if that's really not something the authors intended.

> Running threads in Spark DataFrame foreachPartition() causes NullPointerException
> ---------------------------------------------------------------------------------
>                 Key: SPARK-19476
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Gal Topper
>            Priority: Minor
> First reported on [Stack overflow|].
> I use multiple threads inside foreachPartition(), which works great for me except for
when the underlying iterator is TungstenAggregationIterator. Here is a minimal code snippet
to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
>     import
>     import scala.concurrent.duration.Duration
>     import scala.concurrent.{Await, Future}
>     import org.apache.spark.SparkContext
>     import org.apache.spark.sql.SQLContext
>     object Reproduce extends App {
>       val sc = new SparkContext("local", "reproduce")
>       val sqlContext = new SQLContext(sc)
>       import sqlContext.implicits._
>       val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>       df.foreachPartition { iterator =>
>         val f = Future(iterator.toVector)
>         Await.result(f, Duration.Inf)
>       }
>     }
> {code}
> When I run this, I get:
> {noformat}
>     java.lang.NullPointerException
>         at
>         at
>         at scala.collection.Iterator$$anon$
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - TungstenAggregationIterator uses a
ThreadLocal variable that returns null when called from a thread other than the original thread
that got the iterator from Spark. From examining the code, this does not appear to differ
between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not documented,
as far as I'm aware.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message