spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Jin <linbojin...@gmail.com>
Subject dataframe udf functioin will be executed twice when filter on new column created by withColumn
Date Wed, 11 May 2016 13:55:08 GMT
Hi guys,

I have a problem about spark DataFrame. My spark version is 1.6.1.
Basically, i used udf and df.withColumn to create a "new" column, and then
i filter the values on this new columns and call show(action). I see the
udf function (which is used to by withColumn to create the new column) is
called twice(duplicated). And if filter on "old" column, udf only run once
which is expected. I attached the example codes, line 30~38 shows the
problem.

 Anyone knows the internal reason? Can you give me any advices? Thank you
very much.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction =
UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+



Best wishes.
By Linbo

Mime
View raw message