spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-9899] [SQL] Disables customized output committer when speculation is on
Date Wed, 19 Aug 2015 21:27:31 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d9dfd43d4 -> b32a31d64


[SPARK-9899] [SQL] Disables customized output committer when speculation is on

Speculation hates direct output committer, as there are multiple corner cases that may cause
data corruption and/or data loss.

Please see this [PR comment] [1] for more details.

[1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385

Author: Cheng Lian <lian@databricks.com>

Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.

(cherry picked from commit f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41)
Signed-off-by: Michael Armbrust <michael@databricks.com>

Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b32a31d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b32a31d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b32a31d6

Branch: refs/heads/branch-1.5
Commit: b32a31d64588f2fd3926f09482e2bfe968370fc0
Parents: d9dfd43
Author: Cheng Lian <lian@databricks.com>
Authored: Wed Aug 19 14:15:28 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed Aug 19 14:26:11 2015 -0700

----------------------------------------------------------------------
 .../execution/datasources/WriterContainer.scala | 16 ++++++++-
 .../sql/sources/hadoopFsRelationSuites.scala    | 34 ++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b32a31d6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 427c399..11c97e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer(
   // This is only used on driver side.
   @transient private val jobContext: JobContext = job
 
+  private val speculationEnabled: Boolean =
+    relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue =
false)
+
   // The following fields are initialized and used on both driver and executor side.
   @transient protected var outputCommitter: OutputCommitter = _
   @transient private var jobId: JobID = _
@@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer(
       // associated with the file output format since it is not safe to use a custom
       // committer for appending. For example, in S3, direct parquet output committer may
       // leave partial data in the destination dir when the the appending job fails.
+      //
+      // See SPARK-8578 for more details
       logInfo(
-        s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}
" +
+        s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName}
" +
           "for appending.")
       defaultOutputCommitter
+    } else if (speculationEnabled) {
+      // When speculation is enabled, it's not safe to use customized output committer classes,
+      // especially direct output committers (e.g. `DirectParquetOutputCommitter`).
+      //
+      // See SPARK-9899 for more details.
+      logInfo(
+        s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName}
" +
+          "because spark.speculation is configured to be true.")
+      defaultOutputCommitter
     } else {
       val committerClass = context.getConfiguration.getClass(
         SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

http://git-wip-us.apache.org/repos/asf/spark/blob/b32a31d6/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index af44562..33d8730 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -553,6 +553,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils
{
       clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
     }
   }
+
+  test("SPARK-9899 Disable customized output committer when speculation is on") {
+    val clonedConf = new Configuration(configuration)
+    val speculationEnabled =
+      sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
+
+    try {
+      withTempPath { dir =>
+        // Enables task speculation
+        sqlContext.sparkContext.conf.set("spark.speculation", "true")
+
+        // Uses a customized output committer which always fails
+        configuration.set(
+          SQLConf.OUTPUT_COMMITTER_CLASS.key,
+          classOf[AlwaysFailOutputCommitter].getName)
+
+        // Code below shouldn't throw since customized output committer should be disabled.
+        val df = sqlContext.range(10).coalesce(1)
+        df.write.format(dataSourceName).save(dir.getCanonicalPath)
+        checkAnswer(
+          sqlContext
+            .read
+            .format(dataSourceName)
+            .option("dataSchema", df.schema.json)
+            .load(dir.getCanonicalPath),
+          df)
+      }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+      sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message