spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-6352] [SQL] Custom parquet output committer
Date Tue, 28 Apr 2015 08:51:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master d94cd1a73 -> e13cd8656


[SPARK-6352] [SQL] Custom parquet output committer

Add new config "spark.sql.parquet.output.committer.class" to allow custom parquet output committer
and an output committer class specific to use on s3.
Fix compilation error introduced by https://github.com/apache/spark/pull/5042.
Respect ParquetOutputFormat.ENABLE_JOB_SUMMARY flag.

Author: Pei-Lun Lee <pllee@appier.com>

Closes #5525 from ypcat/spark-6352 and squashes the following commits:

54c6b15 [Pei-Lun Lee] error handling
472870e [Pei-Lun Lee] add back custom parquet output committer
ddd0f69 [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
9ece5c5 [Pei-Lun Lee] compatibility with hadoop 1.x
8413fcd [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
fe65915 [Pei-Lun Lee] add support for parquet config parquet.enable.summary-metadata
e17bf47 [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352
9ae7545 [Pei-Lun Lee] [SPARL-6352] [SQL] Change to allow custom parquet output committer.
0d540b9 [Pei-Lun Lee] [SPARK-6352] [SQL] add license
c42468c [Pei-Lun Lee] [SPARK-6352] [SQL] add test case
0fc03ca [Pei-Lun Lee] [SPARK-6532] [SQL] hide class DirectParquetOutputCommitter
769bd67 [Pei-Lun Lee] DirectParquetOutputCommitter
f75e261 [Pei-Lun Lee] DirectParquetOutputCommitter


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e13cd865
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e13cd865
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e13cd865

Branch: refs/heads/master
Commit: e13cd86567a43672297bb488088dd8f40ec799bf
Parents: d94cd1a
Author: Pei-Lun Lee <pllee@appier.com>
Authored: Tue Apr 28 16:50:18 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Tue Apr 28 16:50:18 2015 +0800

----------------------------------------------------------------------
 .../parquet/DirectParquetOutputCommitter.scala  | 73 ++++++++++++++++++++
 .../sql/parquet/ParquetTableOperations.scala    | 21 ++++++
 .../spark/sql/parquet/ParquetIOSuite.scala      | 22 ++++++
 3 files changed, 116 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e13cd865/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
new file mode 100644
index 0000000..f5ce271
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+
+import parquet.Log
+import parquet.hadoop.util.ContextUtil
+import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
+
+private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+  val LOG = Log.getLog(classOf[ParquetOutputCommitter])
+
+  override def getWorkPath(): Path = outputPath
+  override def abortTask(taskContext: TaskAttemptContext): Unit = {}
+  override def commitTask(taskContext: TaskAttemptContext): Unit = {}
+  override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
+  override def setupJob(jobContext: JobContext): Unit = {}
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {}
+
+  override def commitJob(jobContext: JobContext) {
+    val configuration = ContextUtil.getConfiguration(jobContext)
+    val fileSystem = outputPath.getFileSystem(configuration)
+
+    if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
+      try {
+        val outputStatus = fileSystem.getFileStatus(outputPath)
+        val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
+        try {
+          ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
+        } catch {
+          case e: Exception => {
+            LOG.warn("could not write summary file for " + outputPath, e)
+            val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
+            if (fileSystem.exists(metadataPath)) {
+              fileSystem.delete(metadataPath, true)
+            }
+          }
+        }
+      } catch {
+        case e: Exception => LOG.warn("could not write summary file for " + outputPath,
e)
+      }
+    }
+
+    if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true))
{
+      try {
+        val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
+        fileSystem.create(successPath).close()
+      } catch {
+        case e: Exception => LOG.warn("could not write success file for " + outputPath,
e)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/e13cd865/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index a938b77..aded126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -381,6 +381,7 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
   extends parquet.hadoop.ParquetOutputFormat[Row] {
   // override to accept existing directories as valid output directory
   override def checkOutputSpecs(job: JobContext): Unit = {}
+  var committer: OutputCommitter = null
 
   // override to choose output filename so not overwrite existing ones
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path =
{
@@ -403,6 +404,26 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
   private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
     context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
   }
+
+  // override to create output committer from configuration
+  override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
+    if (committer == null) {
+      val output = getOutputPath(context)
+      val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
+        classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
+      val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+      committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
+    }
+    committer
+  }
+
+  // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
+  private def getOutputPath(context: TaskAttemptContext): Path = {
+    context.getConfiguration().get("mapred.output.dir") match {
+      case null => null
+      case name => new Path(name)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e13cd865/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 97c0f43..b504842 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -381,6 +381,28 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
       }
     }
   }
+
+  test("SPARK-6352 DirectParquetOutputCommitter") {
+    // Write to a parquet file and let it fail.
+    // _temporary should be missing if direct output committer works.
+    try {
+      configuration.set("spark.sql.parquet.output.committer.class",
+        "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
+      sqlContext.udf.register("div0", (x: Int) => x / 0)
+      withTempPath { dir =>
+        intercept[org.apache.spark.SparkException] {
+          sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
+        }
+        val path = new Path(dir.getCanonicalPath, "_temporary")
+        val fs = path.getFileSystem(configuration)
+        assert(!fs.exists(path))
+      }
+    }
+    finally {
+      configuration.set("spark.sql.parquet.output.committer.class",
+        "parquet.hadoop.ParquetOutputCommitter")
+    }
+  }
 }
 
 class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message