Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C18BF17B67 for ; Mon, 18 May 2015 19:17:29 +0000 (UTC) Received: (qmail 96700 invoked by uid 500); 18 May 2015 19:17:28 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 96670 invoked by uid 500); 18 May 2015 19:17:28 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 96661 invoked by uid 99); 18 May 2015 19:17:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 May 2015 19:17:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B106EE05DD; Mon, 18 May 2015 19:17:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marmbrus@apache.org To: commits@spark.apache.org Message-Id: <92dbde0feb2d4d548c0a8f6a6bc9cad8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer based on mapreduce apis Date: Mon, 18 May 2015 19:17:28 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.4 d6f5f3791 -> a385f4b8d [SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer based on mapreduce apis cc liancheng marmbrus Author: Yin Huai Closes #6130 from yhuai/directOutput and squashes the following commits: 312b07d [Yin Huai] A data source can use spark.sql.sources.outputCommitterClass to override the output committer. (cherry picked from commit 530397ba2f5c0fcabb86ba73048c95177ed0b9fc) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a385f4b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a385f4b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a385f4b8 Branch: refs/heads/branch-1.4 Commit: a385f4b8dd22e0e056569cffc4fa63047cb7c8f2 Parents: d6f5f37 Author: Yin Huai Authored: Mon May 18 12:17:10 2015 -0700 Committer: Michael Armbrust Committed: Mon May 18 12:17:22 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLConf.scala | 4 +++ .../apache/spark/sql/parquet/newParquet.scala | 2 +- .../org/apache/spark/sql/sources/commands.scala | 29 +++++++++++++++----- .../apache/spark/sql/sources/interfaces.scala | 3 +- 4 files changed, 29 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a385f4b8/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6da910e..77c6af2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -71,6 +71,10 @@ private[spark] object SQLConf { // Whether to perform partition discovery when loading external data sources. Default to true. val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled" + // The output committer class used by FSBasedRelation. The specified class needs to be a + // subclass of org.apache.hadoop.mapreduce.OutputCommitter. + val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass" + // Whether to perform eager analysis when constructing a dataframe. // Set to false when debugging requires the ability to look at invalid query plans. val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis" http://git-wip-us.apache.org/repos/asf/spark/blob/a385f4b8/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bcbdb1e..fea54a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -197,7 +197,7 @@ private[sql] class ParquetRelation2( classOf[ParquetOutputCommitter]) conf.setClass( - "mapred.output.committer.class", + SQLConf.OUTPUT_COMMITTER_CLASS, committerClass, classOf[ParquetOutputCommitter]) http://git-wip-us.apache.org/repos/asf/spark/blob/a385f4b8/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index a09bb08..d54dbb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} import org.apache.hadoop.util.Shell import parquet.hadoop.util.ContextUtil @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode} private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, @@ -287,24 +287,39 @@ private[sql] abstract class BaseWriterContainer( protected def getWorkPath: String = { outputCommitter match { // FileOutputCommitter writes to a temporary location returned by `getWorkPath`. - case f: FileOutputCommitter => f.getWorkPath.toString + case f: MapReduceFileOutputCommitter => f.getWorkPath.toString case _ => outputPath } } private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { val committerClass = context.getConfiguration.getClass( - "mapred.output.committer.class", null, classOf[OutputCommitter]) + SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter]) Option(committerClass).map { clazz => - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - ctor.newInstance(new Path(outputPath), context) + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + ctor.newInstance(new Path(outputPath), context) + } else { + // The specified output committer is just a OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + ctor.newInstance() + } }.getOrElse { + // If output committer class is not set, we will use the one associated with the + // file output format. outputFormatClass.newInstance().getOutputCommitter(context) } } - private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) this.taskId = new TaskID(this.jobId, true, splitId) http://git-wip-us.apache.org/repos/asf/spark/blob/a385f4b8/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 274ab44..a82a675 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -527,7 +527,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio /** * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can - * be put here. For example, user defined output committer can be configured here. + * be put here. For example, user defined output committer can be configured here + * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. * * Note that the only side effect expected here is mutating `job` via its setters. Especially, * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org