spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dan Brown (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-10685) Misaligned data with RDD.zip and DataFrame.withColumn after repartition
Date Fri, 18 Sep 2015 02:49:04 GMT
Dan Brown created SPARK-10685:
---------------------------------

             Summary: Misaligned data with RDD.zip and DataFrame.withColumn after repartition
                 Key: SPARK-10685
                 URL: https://issues.apache.org/jira/browse/SPARK-10685
             Project: Spark
          Issue Type: Bug
    Affects Versions: 1.5.0, 1.4.1, 1.3.0
         Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
- Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
            Reporter: Dan Brown


Here's a weird behavior where {{RDD.zip}} or {{DataFrame.withColumn}} after a {{repartition}}
produces "misaligned" data, meaning different column values in the same row aren't matched,
as if a zip shuffled the collections before zipping them. It's difficult to reproduce because
it's nondeterministic, doesn't occur in local mode, and requires ≥2 workers (≥3 in one
case). I was able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop),
and 1.5.0 (bin-without-hadoop).

Here's the most similar issue I was able to find. It appears to not have been repro'd and
then closed optimistically, and it smells like it could have been the same underlying cause
that was never fixed:

- https://issues.apache.org/jira/browse/SPARK-9131

Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying to build it
ourselves when we ran into this problem. Let me put in my vote for reopening the issue and
supporting {{DataFrame.zip}} in the standard lib.

- https://issues.apache.org/jira/browse/SPARK-7460

h3. Brief repro

Fail: withColumn(udf) after DataFrame.repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
[r for r in df.collect() if r.a != r.b][:3] # Should be []
{code}

Sample outputs (nondeterministic):
{code}
[Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)]
[Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
[]
[Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)]
[Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)]
[Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
{code}

Fail: RDD.zip after DataFrame.repartition
{code}
df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df  = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
[r for r in rdd.collect() if r.a != r.b][:3] # Should be []
{code}

Sample outputs (nondeterministic):
{code}
[]
[Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
[]
[]
[Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
[]
{code}

Test setup:

- local\[8]: {{MASTER=local\[8]}}
- dist\[N]: 1 driver + 1 master + N workers

{code}
"Fail" tests pass?  cluster mode  spark version
----------------------------------------------------
yes                 local[8]      1.3.0-cdh5.4.5
no                  dist[4]       1.3.0-cdh5.4.5
yes                 local[8]      1.4.1
yes                 dist[1]       1.4.1
no                  dist[2]       1.4.1
no                  dist[4]       1.4.1
yes                 local[8]      1.5.0
yes                 dist[1]       1.5.0
no                  dist[2]       1.5.0
no                  dist[4]       1.5.0
{code}

h3. Detailed repro

Start `pyspark` and run these imports:
{code}
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StructType, StructField
{code}

Fail: withColumn(udf) after DataFrame.repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting partition
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting partitions
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting partitions
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100))
df = df.repartition(1)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100))
df = df.coalesce(10)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Ok: withColumn without udf
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
df = df.withColumn('b', df.a)
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Ok: createDataFrame(RDD.map) instead of withColumn(udf)
{code}
df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df  = df.repartition(100)
rdd = df.map(lambda r: Row(a=r.a, b=r.a))
df  = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + [StructField('b', IntegerType())]))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Fail: createDataFrame(RDD.zip) instead of withColumn(udf)
{code}
df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df  = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
df  = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + [StructField('b', IntegerType())]))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}

Fail: RDD.zip after DataFrame.repartition
{code}
df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df  = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
{code}

Fail: RDD.zip after RDD.repartition after 100 starting partitions

- Failure requires ≥3 workers (whether dist or pseudo-dist)

{code}
rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)
rdd = rdd.repartition(100)
rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
{code}

Ok: RDD.zip after RDD.repartition after 1 starting partition
{code}
rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1)
rdd = rdd.repartition(100)
rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
{code}

Test setup:

- local\[8]: {{MASTER=local\[8]}}
- pseudo-dist\[N]: 1 driver + 1 master + N workers; master and workers all on same OS
- dist\[N]: 1 driver + 1 master + N workers; master and workers all on separate OS's
- Spark 1.3.0-cdh5.4.5 with dist\[4] didn't trip any of the {{withColumn}} failures, but did
trip the {{zip}} failures
- {{-}} indicates a configuration I didn't try

{code}
"Ok" tests pass?  "Fail" tests pass?        platform  cluster mode    spark version
----------------------------------------------------------------
yes               yes                       ubuntu    local[8]        1.3.0-cdh5.4.5
-                 -                         ubuntu    pseudo-dist[1]  1.3.0-cdh5.4.5
-                 -                         ubuntu    pseudo-dist[2]  1.3.0-cdh5.4.5
yes               no[zip], yes[withColumn]  ubuntu    dist[4]         1.3.0-cdh5.4.5
yes               yes                       osx       local[8]        1.4.1
yes               yes                       ubuntu    local[8]        1.4.1
yes               yes                       osx       pseudo-dist[1]  1.4.1
-                 -                         ubuntu    pseudo-dist[1]  1.4.1
yes               no                        osx       pseudo-dist[2]  1.4.1
-                 -                         ubuntu    pseudo-dist[2]  1.4.1
-                 -                         osx       dist[4]         1.4.1
yes               no                        ubuntu    dist[4]         1.4.1
yes               yes                       osx       local[8]        1.5.0
yes               yes                       ubuntu    local[8]        1.5.0
yes               yes                       osx       pseudo-dist[1]  1.5.0
yes               yes                       ubuntu    pseudo-dist[1]  1.5.0
yes               no                        osx       pseudo-dist[2]  1.5.0
yes               no                        ubuntu    pseudo-dist[2]  1.5.0
-                 -                         osx       dist[4]         1.5.0
yes               no                        ubuntu    dist[4]         1.5.0
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message