spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Linbo (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-13773) UDF being applied to filtered data
Date Mon, 23 May 2016 07:42:12 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15296052#comment-15296052
] 

Linbo commented on SPARK-13773:
-------------------------------

See this pr: https://github.com/apache/spark/pull/13087

> UDF being applied to filtered data 
> -----------------------------------
>
>                 Key: SPARK-13773
>                 URL: https://issues.apache.org/jira/browse/SPARK-13773
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: James Hammerton
>
> Give the following code:
> {code:title=ReproduceSparkBug.scala|borderStyle=solid}
> import scala.reflect.runtime.universe
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions.udf
> import org.apache.spark.sql.types.DataTypes
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.apache.spark.SparkConf
> object ReproduceSparkBug {
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setMaster("local")
>       .setAppName("ReproduceSparkBug")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     val schema = StructType(Array(
>       StructField("userId", DataTypes.StringType),
>       StructField("objectId", DataTypes.StringType),
>       StructField("eventName", DataTypes.StringType),
>       StructField("eventJson", DataTypes.StringType),
>       StructField("timestamp", DataTypes.LongType)))
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     val df = sqlContext.read.format("com.databricks.spark.csv")
>       .option("delimiter", "\t")
>       .option("header", "false")
>       .schema(schema).load("src/test/resources/foo.txt")
>     val filtered = df.filter((df("eventName")).endsWith("Created"))
>     val extracted = filtered.select(filtered(EventFieldNames.ObjectId), 
>       extractorUDF(filtered("eventJson"), filtered("objectId"), filtered("userId")) as
"info")
>     extracted.filter((extracted("info")).notEqual("NO REFS")).collect.map(r => (r.getString(0),
r.getString(1))).foreach(println(_))
>     sc.stop()
>   }
>   def extractorUDF = udf(extractor(_: String, _: String, _: String))
>   def extractor(eventJson: String, objectId: String, userId: String): String = {
>     println(eventJson + ":" + objectId + ":" + userId)
>     eventJson + ":" + objectId + ":" + userId
>   }
> }
> {code}
> If "src/test/resources" contains the following:
> {noformat}
> 113	0c38c6c327224e43a46f663b6424612f	Created	{"field":"value"}	1000
> 113	0c38c6c327224e43a46f663b6424612f	LabelRemoved	{"this":"should not appear"}	1000
> {noformat}
> Then the code outputs the following to std out:
> {noformat}
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"this":"should not appear"}:0c38c6c327224e43a46f663b6424612f:113
> (0c38c6c327224e43a46f663b6424612f,{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113)
> {noformat}
> If the first filter is cached (i.e. we write 'val filtered = df.filter((df("eventName")).endsWith("Created")).cache'),
then only the first and last lines appear.
> What I think is happening is that the UDF is applied to the unfiltered data but then
the filtering is applied so the correct data is output. Also it seems the UDF gets applied
more than once to the data that isn't filtered for some reason.
> This caused problems in my original code where some json parsing was done in the UDF
but was throwing exceptions because it was applied to data that should have been filtered
out. The original code was reading from parquet but I switch to tab separated format here
to make things easier to see/post.
> I suspect the bug hasn't been found hitherto since the correct results do get produced
in the end, and the UDF would need to cause task failures when applied to the filtered data
for people to notice.
> Note that I could not reproduce this unless the data was read in from a file. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message