spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mateusz Jukiewicz (JIRA)" <>
Subject [jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results
Date Sun, 12 Aug 2018 23:33:00 GMT


Mateusz Jukiewicz commented on SPARK-23298:


I edited the issue description and added a reproducible example which you can try out. Please
keep in mind it might take several spark session of "distinct counting" to observe.

On the other hand, I tried and cannot reproduce the issue on Spark 2.3.1. Therefore the aforementioned
SPARK-23207 could have fixed this one as well. Obviously, it would probably be the best if
you managed to reproduce on Spark 2.2 and confirm SPARK-23207 fixes this one as well. But
if you guys don't have time for that, I'm fine with closing this one right away.

> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>                 Key: SPARK-23298
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, SQL, YARN
>    Affects Versions: 2.1.0, 2.2.0
>         Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion -r 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>            Reporter: Mateusz Jukiewicz
>            Priority: Major
>              Labels: Correctness, CorrectnessBug, correctness
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(10000000).withColumn("col1", (rand() * 1000).cast("long")).withColumn("col2",
(rand() * 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df ="/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                          
> df.distinct.count
> // res2: Long = 999955   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The string has alphanumeric
characters as well as colons and spaces. The line length does not exceed 1200. I don't think
that's important though, as the issue appeared on various other datasets, I just tried to
narrow it down to the simplest possible case.- (the case is now fully reproducible with the
above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark local mode.
>  * It seems that once the correct count is computed, it is not possible to reproduce
the issue in the same spark session. In other words, I was able to get 2-3 incorrect distinct.count
results consecutively, but once it got right, it always returned the correct value. I had
to re-run spark-shell to observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was always the
same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the correct
number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. _distinct.count_ shouldn't
change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always able to
correctly read all input records (which was shown in the UI), and that number got malformed
after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. The reason
might be:
>  ** in a simple `distinct.count` invocation, there are in total three hash-related stages
(called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>    * @note Equality checking is performed directly on the encoded representation of the
>    * and thus is not affected by a custom `equals` function defined on `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This is greater
than 2000, which means that a different data structure (i.e. _HighlyCompressedMapStatus_instead
of _CompressedMapStatus_) will be used for book-keeping during the shuffle. Unfortunately
after decreasing the number below this threshold the problem still occurs.
>  * It's easier to reproduce the issue with a large number of partitions.
>  * One of my another suspicions was that it's somehow related to the number of blocks
on the HDFS (974). I was able to reproduce the problem with both less and more partitions
than this value, so I think this is not the case.
>  * Final note: It looks like for some reason the data gets duplicated in the process
of data exchange during the shuffle (because shuffle read sees more elements than shuffle
write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I found was on
the spark mailing list where someone using PySpark has found that one of his/her executors
was hashing things differently than the other one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with strings and
as this occurred on many different datasets, I doubt it's data-related. If that's necessary
though, please let me know and I will try to prepare an example.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message