spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthew Fishkin (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
Date Tue, 02 Jan 2018 23:26:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308882#comment-16308882
] 

Matthew Fishkin edited comment on SPARK-22942 at 1/2/18 11:25 PM:
------------------------------------------------------------------

I would expect that to work too. I'm more curious why the null pointer is occurring when none
of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
{code}
to
{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one column of
an outer join is null, the other must be defined.


was (Author: mjfish93):
I would expect that to work too. I'm more curious why the null pointer is occurring when none
of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
{code}

{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one column of
an outer join is null, the other must be defined.

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-22942
>                 URL: https://issues.apache.org/jira/browse/SPARK-22942
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, SQL
>    Affects Versions: 2.2.0
>            Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe that has
columns that were added using a UDF. I am able to replicate the problem with a smaller set
of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +----+--------------------------------+-----------------------------------+
> |name|infos                           |infos                              |
> +----+--------------------------------+-----------------------------------+
> |b   |[[1,blue], [2,black]]           |null                               |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null                            |[[5,white], [4,orange]]            |
> |d   |null                            |[[4,orange], [2,black]]            |
> +----+--------------------------------+-----------------------------------+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> +----+-----------------------+
> |name|r_infos                |
> +----+-----------------------+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> +----+-----------------------+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` contains
_black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
>     color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> +----+-----------------------+---------+
> |name|r_infos                |has_black|
> +----+-----------------------+---------+
> |c   |[[5,white], [4,orange]]|false    |
> |d   |[[4,orange], [2,black]]|true     |
> +----+-----------------------+---------+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1:
(array<struct<number:int,color:string>>) => boolean)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
>   ... 58 elided
> Caused by: java.lang.NullPointerException
>   at $anonfun$hasBlack$1.apply(<console>:41)
>   at $anonfun$hasBlack$1.apply(<console>:40)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
>   ... 114 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message