spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22541) Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators
Date Mon, 20 Nov 2017 07:19:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258886#comment-16258886
] 

Apache Spark commented on SPARK-22541:
--------------------------------------

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/19787

> Dataframes: applying multiple filters one after another using udfs and accumulators results
in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22541
>                 URL: https://issues.apache.org/jira/browse/SPARK-22541
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0
>         Environment: pyspark 2.2.0, ubuntu
>            Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in dataframes.
> If I'm applying multiple filters one after the other, they seem to be executed in parallel,
not in sequence, which messes with the accumulators i'm using to keep track of filtered data.

> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
> def __myfilter(val, acc):
>     if val < 2:
>         return True
>     else:
>         acc.add(1)
>     return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
>     return __myfilter(val, acc1)
> def myfilter2(val):
>     return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message