spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrew Snare (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
Date Mon, 27 Nov 2017 21:14:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267567#comment-16267567
] 

Andrew Snare commented on SPARK-22541:
--------------------------------------

Although this ship has sailed, a search brought me here while looking into this issue. Even
though the query optimisation is intended and its behaviour known, something really doesn't
feel right since it can produce surprising results.

Given:
{code}
>>> recip = sf.udf(lambda x: 1 / x, FloatType())
{code}

This fails:
{code}
>>> spark.createDataFrame([[0.0], [1.0]], ['value']) \
...      .select(sf.when(sf.col('value') > 0, recip('value'))).show()
[…]
ZeroDivisionError: float division by zero
{code}

This succeeds:
{code}
>>> spark.createDataFrame([[0.0], [1.0]], ['value']) \
...      .select(sf.when(sf.col('value') > 0, 1 / sf.col('value'))).show()
+------------------------------------------+
|CASE WHEN (value > 0) THEN (1 / value) END|
+------------------------------------------+
|                                      null|
|                                       1.0|
+------------------------------------------+
{code}

The scala equivalents of _both_ succeed:
{code}
scala> val recip = udf((x: Float) => 1 / x)
scala> Seq(0.0, 1.0).toDF.select(when('value > 0, recip('value))).show()
+-----------------------------------------+
|CASE WHEN (value > 0) THEN UDF(value) END|
+-----------------------------------------+
|                                     null|
|                                      1.0|
+-----------------------------------------+
scala> Seq(0.0, 1.0).toDF.select(when('value > 0, lit(1) / 'value)).show()
+------------------------------------------+
|CASE WHEN (value > 0) THEN (1 / value) END|
+------------------------------------------+
|                                      null|
|                                       1.0|
+------------------------------------------+
{code}


> Dataframes: applying multiple filters one after another using udfs and accumulators results
in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22541
>                 URL: https://issues.apache.org/jira/browse/SPARK-22541
>             Project: Spark
>          Issue Type: Documentation
>          Components: PySpark
>    Affects Versions: 2.2.0
>         Environment: pyspark 2.2.0, ubuntu
>            Reporter: Janne K. Olesen
>            Assignee: Liang-Chi Hsieh
>             Fix For: 2.3.0
>
>
> I'm using udf filters and accumulators to keep track of filtered rows in dataframes.
> If I'm applying multiple filters one after the other, they seem to be executed in parallel,
not in sequence, which messes with the accumulators i'm using to keep track of filtered data.

> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
> def __myfilter(val, acc):
>     if val < 2:
>         return True
>     else:
>         acc.add(1)
>     return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
>     return __myfilter(val, acc1)
> def myfilter2(val):
>     return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message