spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-18672][CORE] Close recordwriter in SparkHadoopMapReduceWriter before committing
Date Tue, 06 Dec 2016 04:20:22 GMT
Repository: spark
Updated Branches:
  refs/heads/master 772ddbeaa -> b8c7b8d31


[SPARK-18672][CORE] Close recordwriter in SparkHadoopMapReduceWriter before committing

## What changes were proposed in this pull request?

It seems some APIs such as `PairRDDFunctions.saveAsHadoopDataset()` do not close the record
writer before issuing the commit for the task.

On Windows, the output in the temp directory is being open and output committer tries to rename
it from temp directory to the output directory after finishing writing.

So, it fails to move the file. It seems we should close the writer actually before committing
the task like the other writers such as `FileFormatWriter`.

Identified failure was as below:

```
FAILURE! - in org.apache.spark.JavaAPISuite
writeWithNewAPIHadoopFile(org.apache.spark.JavaAPISuite)  Time elapsed: 0.25 sec  <<<
ERROR!
org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.JavaAPISuite.writeWithNewAPIHadoopFile(JavaAPISuite.java:1231)
Caused by: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.SparkException:
Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:182)
	at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:100)
	at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:99)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not rename file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/_temporary/attempt_20161201005155_0000_r_000000_0
to file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/task_20161201005155_0000_r_000000
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:436)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:415)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:153)
	at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:167)
	at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:156)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
	at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:168)
	... 8 more
Driver stacktrace:
	at org.apache.spark.JavaAPISuite.writeWithNewAPIHadoopFile(JavaAPISuite.java:1231)
Caused by: org.apache.spark.SparkException: Task failed while writing rows
Caused by: java.io.IOException: Could not rename file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/_temporary/attempt_20161201005155_0000_r_000000_0
to file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/task_20161201005155_0000_r_000000
```

This PR proposes to close this before committing the task.

## How was this patch tested?

Manually tested via AppVeyor.

**Before**

https://ci.appveyor.com/project/spark-test/spark/build/94-scala-tests

**After**

https://ci.appveyor.com/project/spark-test/spark/build/93-scala-tests

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16098 from HyukjinKwon/close-wirter-first.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8c7b8d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8c7b8d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8c7b8d3

Branch: refs/heads/master
Commit: b8c7b8d31d77b937f8c43d7d1af78d92f2f417a4
Parents: 772ddbe
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Tue Dec 6 12:20:16 2016 +0800
Committer: Sean Owen <sowen@cloudera.com>
Committed: Tue Dec 6 12:20:16 2016 +0800

----------------------------------------------------------------------
 .../io/SparkHadoopMapReduceWriter.scala         | 22 +++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8c7b8d3/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index aaeb3d0..6de1fc0 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -146,7 +146,7 @@ object SparkHadoopMapReduceWriter extends Logging {
       case c: Configurable => c.setConf(hadoopConf)
       case _ => ()
     }
-    val writer = taskFormat.getRecordWriter(taskContext)
+    var writer = taskFormat.getRecordWriter(taskContext)
       .asInstanceOf[RecordWriter[K, V]]
     require(writer != null, "Unable to obtain RecordWriter")
     var recordsWritten = 0L
@@ -154,6 +154,7 @@ object SparkHadoopMapReduceWriter extends Logging {
     // Write all rows in RDD partition.
     try {
       val ret = Utils.tryWithSafeFinallyAndFailureCallbacks {
+        // Write rows out, release resource and commit the task.
         while (iterator.hasNext) {
           val pair = iterator.next()
           writer.write(pair._1, pair._2)
@@ -163,12 +164,23 @@ object SparkHadoopMapReduceWriter extends Logging {
             outputMetricsAndBytesWrittenCallback, recordsWritten)
           recordsWritten += 1
         }
-
+        if (writer != null) {
+          writer.close(taskContext)
+          writer = null
+        }
         committer.commitTask(taskContext)
       }(catchBlock = {
-        committer.abortTask(taskContext)
-        logError(s"Task ${taskContext.getTaskAttemptID} aborted.")
-      }, finallyBlock = writer.close(taskContext))
+        // If there is an error, release resource and then abort the task.
+        try {
+          if (writer != null) {
+            writer.close(taskContext)
+            writer = null
+          }
+        } finally {
+          committer.abortTask(taskContext)
+          logError(s"Task ${taskContext.getTaskAttemptID} aborted.")
+        }
+      })
 
       outputMetricsAndBytesWrittenCallback.foreach {
         case (om, callback) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message