spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Rosen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-6551) Incorrect aggregate results if seqOp(...) mutates its first argument
Date Sun, 31 May 2015 21:10:19 GMT

    [ https://issues.apache.org/jira/browse/SPARK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14566736#comment-14566736
] 

Josh Rosen commented on SPARK-6551:
-----------------------------------

Would love to help diagnose / fix this issue, but a bit swamped with other work.  Can someone
open a PR to add a failing regression test for this, or post such a diff as a gist?  Once
we have that, it should be quick to fix.  Thanks!

> Incorrect aggregate results if seqOp(...) mutates its first argument
> --------------------------------------------------------------------
>
>                 Key: SPARK-6551
>                 URL: https://issues.apache.org/jira/browse/SPARK-6551
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.3.0
>         Environment: Amazon EMR, AMI version 3.5
>            Reporter: Jarno Seppanen
>
> Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp mutating its
first argument.
> * the results are incorrect if seqOp mutates its first argument
> * additionally, the zero value is modified if combOp mutates its first argument (this
is slightly surprising, would be nice to document)
> I'm aggregating the RDD into a nontrivial data structure, and it would be wasteful to
copy the whole data structure into a new instance in every seqOp, so mutation is an important
feature.
> I'm seeing the following behavior:
> {code}
> def inc_mutate(counter, item):
>     counter[0] += 1
>     return counter
> def inc_pure(counter, item):
>     return [counter[0] + 1]
> def merge_mutate(c1, c2):
>     c1[0] += c2[0]
>     return c1
> def merge_pure(c1, c2):
>     return [c1[0] + c2[0]]
> # correct answer, when neither function mutates their arguments
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
> # [10]
> init
> # [0]
> # incorrect answer if seqOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
> # [20] <- WRONG
> init
> # [0]
> # zero value is modified if combOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
> # [10]
> init
> # [10]
> # for completeness
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
> # [20]
> init
> # [20]
> {code}
> I'm running on an EMR cluster launched with:
> {code}
> aws emr create-cluster --name jarno-spark \
>  --ami-version 3.5 \
>  --instance-type c3.8xlarge \
>  --instance-count 5 \
>  --ec2-attributes KeyName=foo \
>  --applications Name=Ganglia \
>  --log-uri s3://foo/log \
>  --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message