spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x
Date Mon, 08 Jun 2015 18:34:24 GMT
Repository: spark
Updated Branches:
  refs/heads/master ed5c2dccd -> bbdfc0a40


[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x

For Hadoop 1.x, `TaskAttemptContext` constructor clones the `Configuration` argument, thus
configurations done in `HadoopFsRelation.prepareForWriteJob()` are not populated to *driver*
side `TaskAttemptContext` (executor side configurations are properly populated). Currently
this should only affect Parquet output committer class configuration.

Author: Cheng Lian <lian@databricks.com>

Closes #6669 from liancheng/spark-8121 and squashes the following commits:

73819e8 [Cheng Lian] Minor logging fix
fce089c [Cheng Lian] Adds more logging
b6f78a6 [Cheng Lian] Fixes compilation error introduced while rebasing
963a1aa [Cheng Lian] Addresses @yhuai's comment
c3a0b1a [Cheng Lian] Fixes InsertIntoHadoopFsRelation job initialization


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbdfc0a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbdfc0a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbdfc0a4

Branch: refs/heads/master
Commit: bbdfc0a40fb39760c122e7b9ce80aa1e340e55ee
Parents: ed5c2dc
Author: Cheng Lian <lian@databricks.com>
Authored: Mon Jun 8 11:34:18 2015 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Mon Jun 8 11:34:18 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  1 +
 .../apache/spark/sql/parquet/newParquet.scala   |  7 +++
 .../org/apache/spark/sql/sources/commands.scala | 18 +++++--
 .../spark/sql/parquet/ParquetIOSuite.scala      | 52 +++++++++++++++++---
 4 files changed, 65 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index c778889..be786f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -76,6 +76,7 @@ private[spark] object SQLConf {
 
   // The output committer class used by FSBasedRelation. The specified class needs to be
a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
+  // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
   val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
 
   // Whether to perform eager analysis when constructing a dataframe.

http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 5dda440..7af4eb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
         classOf[ParquetOutputCommitter],
         classOf[ParquetOutputCommitter])
 
+    if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+      logInfo("Using default output committer for Parquet: " +
+        classOf[ParquetOutputCommitter].getCanonicalName)
+    } else {
+      logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
+    }
+
     conf.setClass(
       SQLConf.OUTPUT_COMMITTER_CLASS,
       committerClass,

http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index bd3aad6..c94199b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
   def driverSideSetup(): Unit = {
     setupIDs(0, 0, 0)
     setupConf()
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
 
-    // This preparation must happen before initializing output format and output committer,
since
-    // their initialization involves the job configuration, which can be potentially decorated
in
-    // `relation.prepareJobForWrite`.
+    // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
+    // clones the Configuration object passed in.  If we initialize the TaskAttemptContext
first,
+    // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+    //
+    // Also, the `prepareJobForWrite` call must happen before initializing output format
and output
+    // committer, since their initialization involve the job configuration, which can be
potentially
+    // decorated in `prepareJobForWrite`.
     outputWriterFactory = relation.prepareJobForWrite(job)
+    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
 
     outputFormatClass = job.getOutputFormatClass
     outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
       SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
 
     Option(committerClass).map { clazz =>
+      logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
       // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
       // has an associated output committer. To override this output committer,
       // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
@@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
     }.getOrElse {
       // If output committer class is not set, we will use the one associated with the
       // file output format.
-      outputFormatClass.newInstance().getOutputCommitter(context)
+      val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+      logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
+      outputCommitter
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 2b6a270..46b2585 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.scalatest.BeforeAndAfterAll
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.parquet.example.data.simple.SimpleGroup
 import org.apache.parquet.example.data.{Group, GroupWriter}
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.util.DateUtils
@@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
 
     withParquetDataFrame(allNulls :: Nil) { df =>
       val rows = df.collect()
-      assert(rows.size === 1)
+      assert(rows.length === 1)
       assert(rows.head === Row(Seq.fill(5)(null): _*))
     }
   }
@@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
 
     withParquetDataFrame(allNones :: Nil) { df =>
       val rows = df.collect()
-      assert(rows.size === 1)
+      assert(rows.length === 1)
       assert(rows.head === Row(Seq.fill(3)(null): _*))
     }
   }
@@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
   }
 
   test("SPARK-6352 DirectParquetOutputCommitter") {
+    val clonedConf = new Configuration(configuration)
+
     // Write to a parquet file and let it fail.
     // _temporary should be missing if direct output committer works.
     try {
@@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
         val fs = path.getFileSystem(configuration)
         assert(!fs.exists(path))
       }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
     }
-    finally {
-      configuration.set("spark.sql.parquet.output.committer.class",
-        "org.apache.parquet.hadoop.ParquetOutputCommitter")
+  }
+
+  test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
+    withTempPath { dir =>
+      val clonedConf = new Configuration(configuration)
+
+      configuration.set(
+        SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
+
+      configuration.set(
+        "spark.sql.parquet.output.committer.class",
+        classOf[BogusParquetOutputCommitter].getCanonicalName)
+
+      try {
+        val message = intercept[SparkException] {
+          sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(message === "Intentional exception for testing purposes")
+      } finally {
+        // Hadoop 1 doesn't have `Configuration.unset`
+        configuration.clear()
+        clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+      }
     }
   }
 }
 
+class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+
+  override def commitJob(jobContext: JobContext): Unit = {
+    sys.error("Intentional exception for testing purposes")
+  }
+}
+
 class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message