spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jiri Syrovy (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-8513) _temporary may be left undeleted when a write job committed with FileOutputCommitter fails due to a race condition
Date Fri, 04 Dec 2015 16:48:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15041742#comment-15041742
] 

Jiri Syrovy edited comment on SPARK-8513 at 12/4/15 4:47 PM:
-------------------------------------------------------------

The same thing happens when writing partitioned files with coalesce(1) and spark.speculation
set to "false". I've tried multiple different hadoop versions (2.6 -> 2.7.1), multiple
writing modes (Overwrite, append, ...), but the result is always almost the same.
{code:java}
		DataFrameWriter writer = df.coalesce(1).write().format("parquet");
		writer = perTemplate ? writer.partitionBy("templateId", "definitionId") 
				: writer.partitionBy("definitionId");
		writer.mode(SaveMode.Append).save(ConfConsts.STORAGE_PREFIX + location);
{code}

{noformat}
[2015-12-04 16:14:59,821] WARN  .apache.hadoop.fs.FileUtil [] [akka://JobServer/user/context-supervisor/CSCONTEXT]
- Failed to delete file or dir [/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918/definitionId=4/.part-r-00000-cee32e28-fa7c-43ec-bbe1-63b639deb395.snappy.parquet.crc]:
it still exists.
[2015-12-04 16:14:59,821] ERROR InsertIntoHadoopFsRelation [] [akka://JobServer/user/context-supervisor/CSCONTEXT]
- Aborting job.
java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918;
isDirectory=true; modification_time=1449245693000; access_time=0; owner=; group=; permission=rwxrwxrwx;
isSymlink=false} to file:/data/build/result_12349.parquet.stats/templateId=2918
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:397)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:388)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
{noformat}


was (Author: xjrk):
The same thing happens when writing partitioned files with coalesce(1) and spark.speculation
set to "false":
{code:java}
		DataFrameWriter writer = df.coalesce(1).write().format("parquet");
		writer = perTemplate ? writer.partitionBy("templateId", "definitionId") 
				: writer.partitionBy("definitionId");
		writer.mode(SaveMode.Append).save(ConfConsts.STORAGE_PREFIX + location);
{code}

{noformat}
[2015-12-04 16:14:59,821] WARN  .apache.hadoop.fs.FileUtil [] [akka://JobServer/user/context-supervisor/CSCONTEXT]
- Failed to delete file or dir [/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918/definitionId=4/.part-r-00000-cee32e28-fa7c-43ec-bbe1-63b639deb395.snappy.parquet.crc]:
it still exists.
[2015-12-04 16:14:59,821] ERROR InsertIntoHadoopFsRelation [] [akka://JobServer/user/context-supervisor/CSCONTEXT]
- Aborting job.
java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918;
isDirectory=true; modification_time=1449245693000; access_time=0; owner=; group=; permission=rwxrwxrwx;
isSymlink=false} to file:/data/build/result_12349.parquet.stats/templateId=2918
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:397)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:388)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
{noformat}

> _temporary may be left undeleted when a write job committed with FileOutputCommitter
fails due to a race condition
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8513
>                 URL: https://issues.apache.org/jira/browse/SPARK-8513
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 1.2.2, 1.3.1, 1.4.0
>            Reporter: Cheng Lian
>
> To reproduce this issue, we need a node with relatively more cores, say 32 (e.g., Spark
Jenkins builder is a good candidate).  With such a node, the following code should be relatively
easy to reproduce this issue:
> {code}
> sqlContext.range(0, 10).repartition(32).select('id / 0).write.mode("overwrite").parquet("file:///tmp/foo")
> {code}
> You may observe similar log lines as below:
> {noformat}
> 01:58:27.682 pool-1-thread-1-ScalaTest-running-CommitFailureTestRelationSuite WARN FileUtil:
Failed to delete file or dir [/home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-a918b285-fa59-4a29-857e-a95e38fa355a/_temporary/0/_temporary]:
it still exists.
> {noformat}
> The reason is that, for a Spark job with multiple tasks, when a task fails after multiple
retries, the job gets canceled on driver side.  At the same time, all child tasks of this
job also get canceled.  However, task cancelation is asynchronous.  This means, some tasks
may still be running when the job is already killed on driver side.
> With this in mind, the following execution order may cause the log line mentioned above:
> # Job {{A}} spawns 32 tasks to write the Parquet file
>   Since {{ParquetOutputCommitter}} is a subclass of {{FileOutputClass}}, a temporary
directory {{D1}} is created to hold output files of different task attempts.
> # Task {{a1}} fails after several retries first because of the division by zero error
> # Task {{a1}} aborts the Parquet write task and tries to remove its task attempt output
directory {{d1}} (a sub-directory of {{D1}})
> # Job {{A}} gets canceled on driver side, all the other 31 tasks also get canceled *asynchronously*
> # {{ParquetOutputCommitter.abortJob()}} tries to remove {{D1}} by first removing all
its child files/directories first
>   Note that when testing with local directory, {{RawLocalFileSystem}} simply calls {{java.io.File.delete()}}
to deletion, and only empty directories can be deleted.
> # Because tasks are canceled asynchronously, some other task, say {{a2}}, may just get
scheduled and create its own task attempt directory {{d2}} under {{D1}}
> # Now {{ParquetOutputCommitter.abortJob()}} tries to finally remove {{D1}} itself, but
fails because {{d2}} makes {{D1}} non-empty again
> Notice that this bug affects all Spark jobs that writes files with {{FileOutputCommitter}}
and its subclasses which create and delete temporary directories.
> One of the possible way to fix this issue can be making task cancellation synchronous,
but this also increases latency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message