Return-Path: X-Original-To: apmail-spark-issues-archive@minotaur.apache.org Delivered-To: apmail-spark-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 614C018EE1 for ; Wed, 17 Jun 2015 08:23:02 +0000 (UTC) Received: (qmail 96343 invoked by uid 500); 17 Jun 2015 08:23:02 -0000 Delivered-To: apmail-spark-issues-archive@spark.apache.org Received: (qmail 96312 invoked by uid 500); 17 Jun 2015 08:23:02 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 96185 invoked by uid 99); 17 Jun 2015 08:23:02 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jun 2015 08:23:02 +0000 Date: Wed, 17 Jun 2015 08:23:02 +0000 (UTC) From: "Cheng Lian (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (SPARK-8406) Race condition when writing Parquet files MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-8406: ------------------------------ Description: To support appending, the Parquet data source tries to find out the max ID of part-files in the destination directory (the in output file name "part-r-.gz.parquet") at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may see wrong max ID generated by newly written files by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive IDs in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same ID, thus one of them gets overwritten by the other. The data loss situation is not quite easy to reproduce. But the following Spark shell snippet can reproduce nonconsecutive output file IDs: {code} sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo") {code} "16" can be replaced with any integer that is greater than the default parallelism on your machine (usually it means core number, on my machine it's 8). {noformat} -rw-r--r-- 3 lian supergroup 0 2015-06-17 00:06 /user/lian/foo/_SUCCESS -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00001.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00002.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00003.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00004.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00005.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00006.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00007.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00008.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00017.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00018.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00019.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00020.gz.parquet -rw-r--r-- 3 lian supergroup 352 2015-06-17 00:06 /user/lian/foo/part-r-00021.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00022.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00023.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00024.gz.parquet {noformat} Notice that the newly added ORC data source doesn't suffer this issue because it uses both task ID and {{System.currentTimeMills()}} to generate the output file name. was: To support appending, the Parquet data source tries to find out the max ID of part-files in the destination directory (the in output file name "part-r-.gz.parquet") at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may see wrong max ID generated by newly written files by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive IDs in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same ID, thus one of them gets overwritten by the other. The data loss situation is not quite easy to reproduce. But the following Spark shell snippet can reproduce nonconsecutive output file IDs: {code} sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo") {code} "16" can be replaced with any integer that is greater than the default parallelism on your machine (usually it means core number, on my machine it's 8). {noformat} -rw-r--r-- 3 lian supergroup 0 2015-06-17 00:06 /user/lian/foo/_SUCCESS -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00001.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00002.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00003.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00004.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00005.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00006.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00007.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00008.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00017.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00018.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00019.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00020.gz.parquet -rw-r--r-- 3 lian supergroup 352 2015-06-17 00:06 /user/lian/foo/part-r-00021.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00022.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00023.gz.parquet -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00024.gz.parquet {noformat} > Race condition when writing Parquet files > ----------------------------------------- > > Key: SPARK-8406 > URL: https://issues.apache.org/jira/browse/SPARK-8406 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.4.0 > Reporter: Cheng Lian > Assignee: Cheng Lian > Priority: Blocker > > To support appending, the Parquet data source tries to find out the max ID of part-files in the destination directory (the in output file name "part-r-.gz.parquet") at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may see wrong max ID generated by newly written files by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive IDs in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same ID, thus one of them gets overwritten by the other. > The data loss situation is not quite easy to reproduce. But the following Spark shell snippet can reproduce nonconsecutive output file IDs: > {code} > sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo") > {code} > "16" can be replaced with any integer that is greater than the default parallelism on your machine (usually it means core number, on my machine it's 8). > {noformat} > -rw-r--r-- 3 lian supergroup 0 2015-06-17 00:06 /user/lian/foo/_SUCCESS > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00001.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00002.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00003.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00004.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00005.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00006.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00007.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00008.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00017.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00018.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00019.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00020.gz.parquet > -rw-r--r-- 3 lian supergroup 352 2015-06-17 00:06 /user/lian/foo/part-r-00021.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00022.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00023.gz.parquet > -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06 /user/lian/foo/part-r-00024.gz.parquet > {noformat} > Notice that the newly added ORC data source doesn't suffer this issue because it uses both task ID and {{System.currentTimeMills()}} to generate the output file name. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org