spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steve Loughran (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-10063) Remove DirectParquetOutputCommitter
Date Fri, 01 Apr 2016 11:06:25 GMT

    [ https://issues.apache.org/jira/browse/SPARK-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15221551#comment-15221551
] 

Steve Loughran commented on SPARK-10063:
----------------------------------------

I should add that as the default committer is using rename(), on some object stores (s3n,
swift), that's a client-side copy may be taking place. On s3a a server-side copy happens.
After all of these, a recursive delete kicks in. So the FileOutputCommitter is equally prone
to race conditions, and uses significantly more IO; rename() is likely to take time O(data)
rather than O(1). I'd go for direct, if you are planning to use s3 as the direct output of
an operation. For speculation, better to write to HDFS and then copy after

> Remove DirectParquetOutputCommitter
> -----------------------------------
>
>                 Key: SPARK-10063
>                 URL: https://issues.apache.org/jira/browse/SPARK-10063
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Yin Huai
>            Assignee: Yin Huai
>            Priority: Critical
>
> When we use DirectParquetOutputCommitter on S3 and speculation is enabled, there is a
chance that we can loss data. 
> Here is the code to reproduce the problem.
> {code}
> import org.apache.spark.sql.functions._
> val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i: Int, partitionId:
Int, attemptNumber: Int) => {
>   if (partitionId == 0 && i == 5) {
>     if (attemptNumber > 0) {
>       Thread.sleep(15000)
>       throw new Exception("new exception")
>     } else {
>       Thread.sleep(10000)
>     }
>   }
>   
>   i
> })
> val df = sc.parallelize((1 to 100), 20).mapPartitions { iter =>
>   val context = org.apache.spark.TaskContext.get()
>   val partitionId = context.partitionId
>   val attemptNumber = context.attemptNumber
>   iter.map(i => (i, partitionId, attemptNumber))
> }.toDF("i", "partitionId", "attemptNumber")
> df
>   .select(failSpeculativeTask($"i", $"partitionId", $"attemptNumber").as("i"), $"partitionId",
$"attemptNumber")
>   .write.mode("overwrite").format("parquet").save("/home/yin/outputCommitter")
> sqlContext.read.load("/home/yin/outputCommitter").count
> // The result is 99 and 5 is missing from the output.
> {code}
> What happened is that the original task finishes first and uploads its output file to
S3, then the speculative task somehow fails. Because we have to call output stream's close
method, which uploads data to S3, we actually uploads the partial result generated by the
failed speculative task to S3 and this file overwrites the correct file generated by the original
task.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message