spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (SPARK-18934) Writing to dynamic partitions does not preserve sort order if spill occurs
Date Tue, 20 Dec 2016 02:38:58 GMT

     [ https://issues.apache.org/jira/browse/SPARK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Apache Spark reassigned SPARK-18934:
------------------------------------

    Assignee:     (was: Apache Spark)

> Writing to dynamic partitions does not preserve sort order if spill occurs
> --------------------------------------------------------------------------
>
>                 Key: SPARK-18934
>                 URL: https://issues.apache.org/jira/browse/SPARK-18934
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>            Reporter: Junegunn Choi
>
> When writing to dynamic partitions, the task sorts the input data by the partition key
(also with bucket key if used), so that it can write to one partition at a time using a single
writer. And if spill occurs during the process, {{UnsafeSorterSpillMerger}} is used to merge
partial sequences of data.
> However, the merge process only considers the partition key, so that the sort order within
a partition specified via {{sortWithinPartitions}} or {{SORT BY}} is not preserved.
> We can reproduce the problem on Spark shell. Make sure to start shell in local mode with
small driver memory (e.g. 1G) so that spills occur.
> {code}
> // FileFormatWriter
> sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").format("orc").partitionBy("part")
>   .saveAsTable("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> {noformat}
> +-------+----+
> |  value|part|
> +-------+----+
> |      2|   0|
> |8388610|   0|
> |      4|   0|
> |8388612|   0|
> |      6|   0|
> |8388614|   0|
> |      8|   0|
> |8388616|   0|
> |     10|   0|
> |8388618|   0|
> |     12|   0|
> |8388620|   0|
> |     14|   0|
> |8388622|   0|
> |     16|   0|
> |8388624|   0|
> |     18|   0|
> |8388626|   0|
> |     20|   0|
> |8388628|   0|
> +-------+----+
> {noformat}
> We can confirm that the issue using orc dump.
> {noformat}
> > java -jar orc-tools-1.3.0-SNAPSHOT-uber.jar meta -d part=0/part-r-00000-96c022f0-a173-40cc-b2e5-9d02fed4213e.snappy.orc
| head -20
> {"value":2}
> {"value":8388610}
> {"value":4}
> {"value":8388612}
> {"value":6}
> {"value":8388614}
> {"value":8}
> {"value":8388616}
> {"value":10}
> {"value":8388618}
> {"value":12}
> {"value":8388620}
> {"value":14}
> {"value":8388622}
> {"value":16}
> {"value":8388624}
> {"value":18}
> {"value":8388626}
> {"value":20}
> {"value":8388628}
> {noformat}
> {{SparkHiveDynamicPartitionWriterContainer}} has the same problem.
> {code}
> // Insert into an existing Hive table with dynamic partitions
> //   CREATE TABLE TEST_SORT_WITHIN (VALUE INT) PARTITIONED BY (PART INT) STORED AS ORC
> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
> sc.parallelize(1 to 10000000).toDS.withColumn("part", 'value.mod(2))
>   .repartition(1, 'part).sortWithinPartitions("value")
>   .write.mode("overwrite").insertInto("test_sort_within")
> spark.read.table("test_sort_within").show
> {code}
> I was able to fix the problem by appending a numeric index column to the sorting key
which effectively makes the sort stable. I'll create a pull request on GitHub but since I'm
not really familiar with the internals of Spark, I'm not sure if my approach is valid or idiomatic.
So please let me know if there are better ways to handle this, or if you want to address the
issue differently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message