spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Linbo (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-15282) UDF function executed twice when filter on new column created by withColumn
Date Tue, 24 May 2016 02:22:13 GMT

     [ https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Linbo updated SPARK-15282:
--------------------------
    Description: 
I found this problem on spark version 1.6.1 and based on  [~tedyu] in current master branch,
the behavior is the same.
Basically, i used udf and df.withColumn to create a "new" column, and then i filter the values
on this new columns and call show(action). I see the udf function (which is used to by withColumn
to create the new column) is called twice(duplicated). And if filter on "old" column, udf
only run once which is expected. I attached the example codes,  `filteredOnNewColumnDF.show`
shows the problem.

{code:title=spark-shell|borderStyle=solid}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+

{code}


Updated: users can avoid this duplicated executed behaviours by making sure the UDF is deterministic.
 
Maybe I should describe more about our use cases. In our project, firstly we generated a dataframe
with one column called "fileName" one column called "url", and then we use a udf function
(used inside withColumn()) to download the files from the corresponding urls and filter out
'{}' data before writing to hdfs:




  was:
I found this problem on spark version 1.6.1 and based on  [~tedyu] in current master branch,
the behavior is the same.
Basically, i used udf and df.withColumn to create a "new" column, and then i filter the values
on this new columns and call show(action). I see the udf function (which is used to by withColumn
to create the new column) is called twice(duplicated). And if filter on "old" column, udf
only run once which is expected. I attached the example codes,  `filteredOnNewColumnDF.show`
shows the problem.

{code:title=spark-shell|borderStyle=solid}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+

{code}


> UDF function executed twice when filter on new column created by withColumn
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-15282
>                 URL: https://issues.apache.org/jira/browse/SPARK-15282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>         Environment: spark 1.6.1
>            Reporter: Linbo
>
> I found this problem on spark version 1.6.1 and based on  [~tedyu] in current master
branch, the behavior is the same.
> Basically, i used udf and df.withColumn to create a "new" column, and then i filter the
values on this new columns and call show(action). I see the udf function (which is used to
by withColumn to create the new column) is called twice(duplicated). And if filter on "old"
column, udf only run once which is expected. I attached the example codes,  `filteredOnNewColumnDF.show`
shows the problem.
> {code:title=spark-shell|borderStyle=solid}
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
string]
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
string]
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> {code}
> Updated: users can avoid this duplicated executed behaviours by making sure the UDF is
deterministic.  
> Maybe I should describe more about our use cases. In our project, firstly we generated
a dataframe with one column called "fileName" one column called "url", and then we use a udf
function (used inside withColumn()) to download the files from the corresponding urls and
filter out '{}' data before writing to hdfs:



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message