spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-18012][SQL] Simplify WriterContainer
Date Thu, 20 Oct 2016 05:22:38 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4b2011ec9 -> f313117bc


[SPARK-18012][SQL] Simplify WriterContainer

## What changes were proposed in this pull request?
This patch refactors WriterContainer to simplify the logic and make control flow more obvious.The
previous code setup made it pretty difficult to track the actual dependencies on variables
and setups because the driver side and the executor side were using the same set of variables.

## How was this patch tested?
N/A - this should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #15551 from rxin/writercontainer-refactor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f313117b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f313117b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f313117b

Branch: refs/heads/master
Commit: f313117bc93b0bf560528b316d3e6947caa96296
Parents: 4b2011e
Author: Reynold Xin <rxin@databricks.com>
Authored: Wed Oct 19 22:22:35 2016 -0700
Committer: Cheng Lian <lian@databricks.com>
Committed: Wed Oct 19 22:22:35 2016 -0700

----------------------------------------------------------------------
 .../InsertIntoHadoopFsRelationCommand.scala     |  79 +--
 .../sql/execution/datasources/WriteOutput.scala | 480 +++++++++++++++++++
 .../execution/datasources/WriterContainer.scala | 445 -----------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 -
 4 files changed, 492 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 99ca3df..22dbe71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -20,18 +20,12 @@ package org.apache.spark.sql.execution.datasources
 import java.io.IOException
 
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 
-import org.apache.spark._
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
@@ -40,20 +34,6 @@ import org.apache.spark.sql.internal.SQLConf
  * implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate
  * unique file path for each task output file.  This UUID is passed to executor side via
a
  * property named `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- *   1. Driver side setup, including output committer initialization and data source specific
- *      preparation work for the write job to be issued.
- *   2. Issues a write job consists of one or more executor side tasks, each of which writes
all
- *      rows within an RDD partition.
- *   3. If no exception is thrown in a task, commits that task, otherwise aborts that task;
 If any
- *      exception is thrown during task commitment, also aborts that task.
- *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception
is
- *      thrown during job commitment, also aborts the job.
  */
 case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
@@ -103,52 +83,17 @@ case class InsertIntoHadoopFsRelationCommand(
     val isAppend = pathExists && (mode == SaveMode.Append)
 
     if (doInsertion) {
-      val job = Job.getInstance(hadoopConf)
-      job.setOutputKeyClass(classOf[Void])
-      job.setOutputValueClass(classOf[InternalRow])
-      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-
-      val partitionSet = AttributeSet(partitionColumns)
-      val dataColumns = query.output.filterNot(partitionSet.contains)
-
-      val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
-      SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
-        val relation =
-          WriteRelation(
-            sparkSession,
-            dataColumns.toStructType,
-            qualifiedOutputPath.toString,
-            fileFormat.prepareWrite(sparkSession, _, options, dataColumns.toStructType),
-            bucketSpec)
-
-        val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty)
{
-          new DefaultWriterContainer(relation, job, isAppend)
-        } else {
-          new DynamicPartitionWriterContainer(
-            relation,
-            job,
-            partitionColumns = partitionColumns,
-            dataColumns = dataColumns,
-            inputSchema = query.output,
-            PartitioningUtils.DEFAULT_PARTITION_NAME,
-            sparkSession.sessionState.conf.partitionMaxFiles,
-            isAppend)
-        }
-
-        // This call shouldn't be put into the `try` block below because it only initializes
and
-        // prepares the job, any exception thrown from here shouldn't cause abortJob() to
be called.
-        writerContainer.driverSideSetup()
-
-        try {
-          sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows
_)
-          writerContainer.commitJob()
-          refreshFunction()
-        } catch { case cause: Throwable =>
-          logError("Aborting job.", cause)
-          writerContainer.abortJob()
-          throw new SparkException("Job aborted.", cause)
-        }
-      }
+      WriteOutput.write(
+        sparkSession,
+        query,
+        fileFormat,
+        qualifiedOutputPath,
+        hadoopConf,
+        partitionColumns,
+        bucketSpec,
+        refreshFunction,
+        options,
+        isAppend)
     } else {
       logInfo("Skipping insertion into a relation that already exists.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
new file mode 100644
index 0000000..54d0f3b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+
+/** A helper object for writing data out to a location. */
+object WriteOutput extends Logging {
+
+  /** A shared job description for all the write tasks. */
+  private class WriteJobDescription(
+      val serializableHadoopConf: SerializableConfiguration,
+      val outputWriterFactory: OutputWriterFactory,
+      val allColumns: Seq[Attribute],
+      val partitionColumns: Seq[Attribute],
+      val nonPartitionColumns: Seq[Attribute],
+      val bucketSpec: Option[BucketSpec],
+      val isAppend: Boolean,
+      val path: String,
+      val outputFormatClass: Class[_ <: OutputFormat[_, _]])
+    extends Serializable {
+
+    assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns),
+      s"""
+         |All columns: ${allColumns.mkString(", ")}
+         |Partition columns: ${partitionColumns.mkString(", ")}
+         |Non-partition columns: ${nonPartitionColumns.mkString(", ")}
+       """.stripMargin)
+  }
+
+  /**
+   * Basic work flow of this command is:
+   * 1. Driver side setup, including output committer initialization and data source specific
+   *    preparation work for the write job to be issued.
+   * 2. Issues a write job consists of one or more executor side tasks, each of which writes
all
+   *    rows within an RDD partition.
+   * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task;
 If any
+   *    exception is thrown during task commitment, also aborts that task.
+   * 4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception
is
+   *    thrown during job commitment, also aborts the job.
+   */
+  def write(
+      sparkSession: SparkSession,
+      plan: LogicalPlan,
+      fileFormat: FileFormat,
+      outputPath: Path,
+      hadoopConf: Configuration,
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      refreshFunction: () => Unit,
+      options: Map[String, String],
+      isAppend: Boolean): Unit = {
+
+    val job = Job.getInstance(hadoopConf)
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    FileOutputFormat.setOutputPath(job, outputPath)
+
+    val partitionSet = AttributeSet(partitionColumns)
+    val dataColumns = plan.output.filterNot(partitionSet.contains)
+    val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution
+
+    // Note: prepareWrite has side effect. It sets "job".
+    val outputWriterFactory =
+      fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
+
+    val description = new WriteJobDescription(
+      serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
+      outputWriterFactory = outputWriterFactory,
+      allColumns = plan.output,
+      partitionColumns = partitionColumns,
+      nonPartitionColumns = dataColumns,
+      bucketSpec = bucketSpec,
+      isAppend = isAppend,
+      path = outputPath.toString,
+      outputFormatClass = job.getOutputFormatClass)
+
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+      // This call shouldn't be put into the `try` block below because it only initializes
and
+      // prepares the job, any exception thrown from here shouldn't cause abortJob() to be
called.
+      val committer = setupDriverCommitter(job, outputPath.toString, isAppend)
+
+      try {
+        sparkSession.sparkContext.runJob(queryExecution.toRdd,
+          (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
+            executeTask(
+              description = description,
+              sparkStageId = taskContext.stageId(),
+              sparkPartitionId = taskContext.partitionId(),
+              sparkAttemptNumber = taskContext.attemptNumber(),
+              iterator = iter)
+          })
+
+        committer.commitJob(job)
+        logInfo(s"Job ${job.getJobID} committed.")
+        refreshFunction()
+      } catch { case cause: Throwable =>
+        logError(s"Aborting job ${job.getJobID}.", cause)
+        committer.abortJob(job, JobStatus.State.FAILED)
+        throw new SparkException("Job aborted.", cause)
+      }
+    }
+  }
+
+  /** Writes data out in a single Spark task. */
+  private def executeTask(
+      description: WriteJobDescription,
+      sparkStageId: Int,
+      sparkPartitionId: Int,
+      sparkAttemptNumber: Int,
+      iterator: Iterator[InternalRow]): Unit = {
+
+    val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the attempt context required to use in the output committer.
+    val taskAttemptContext: TaskAttemptContext = {
+      // Set up the configuration object
+      val hadoopConf = description.serializableHadoopConf.value
+      hadoopConf.set("mapred.job.id", jobId.toString)
+      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
+      hadoopConf.setBoolean("mapred.task.is.map", true)
+      hadoopConf.setInt("mapred.task.partition", 0)
+
+      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+    }
+
+    val committer = newOutputCommitter(
+      description.outputFormatClass, taskAttemptContext, description.path, description.isAppend)
+    committer.setupTask(taskAttemptContext)
+
+    // Figure out where we need to write data to for staging.
+    // For FileOutputCommitter it has its own staging path called "work path".
+    val stagingPath = committer match {
+      case f: FileOutputCommitter => f.getWorkPath.toString
+      case _ => description.path
+    }
+
+    val writeTask =
+      if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty)
{
+        new SingleDirectoryWriteTask(description, taskAttemptContext, stagingPath)
+      } else {
+        new DynamicPartitionWriteTask(description, taskAttemptContext, stagingPath)
+      }
+
+    try {
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+        // Execute the task to write rows out
+        writeTask.execute(iterator)
+        writeTask.releaseResources()
+
+        // Commit the task
+        SparkHadoopMapRedUtil.commitTask(committer, taskAttemptContext, jobId.getId, taskId.getId)
+      })(catchBlock = {
+        // If there is an error, release resource and then abort the task
+        try {
+          writeTask.releaseResources()
+        } finally {
+          committer.abortTask(taskAttemptContext)
+          logError(s"Job $jobId aborted.")
+        }
+      })
+    } catch {
+      case t: Throwable =>
+        throw new SparkException("Task failed while writing rows", t)
+    }
+  }
+
+  /**
+   * A simple trait for writing out data in a single Spark task, without any concerns about
how
+   * to commit or abort tasks. Exceptions thrown by the implementation of this trait will
+   * automatically trigger task aborts.
+   */
+  private trait ExecuteWriteTask {
+    def execute(iterator: Iterator[InternalRow]): Unit
+    def releaseResources(): Unit
+  }
+
+  /** Writes data to a single directory (used for non-dynamic-partition writes). */
+  private class SingleDirectoryWriteTask(
+      description: WriteJobDescription,
+      taskAttemptContext: TaskAttemptContext,
+      stagingPath: String) extends ExecuteWriteTask {
+
+    private[this] var outputWriter: OutputWriter = {
+      val outputWriter = description.outputWriterFactory.newInstance(
+        path = stagingPath,
+        bucketId = None,
+        dataSchema = description.nonPartitionColumns.toStructType,
+        context = taskAttemptContext)
+      outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
+      outputWriter
+    }
+
+    override def execute(iter: Iterator[InternalRow]): Unit = {
+      while (iter.hasNext) {
+        val internalRow = iter.next()
+        outputWriter.writeInternal(internalRow)
+      }
+    }
+
+    override def releaseResources(): Unit = {
+      if (outputWriter != null) {
+        outputWriter.close()
+        outputWriter = null
+      }
+    }
+  }
+
+  /**
+   * Writes data to using dynamic partition writes, meaning this single function can write
to
+   * multiple directories (partitions) or files (bucketing).
+   */
+  private class DynamicPartitionWriteTask(
+      description: WriteJobDescription,
+      taskAttemptContext: TaskAttemptContext,
+      stagingPath: String) extends ExecuteWriteTask {
+
+    // currentWriter is initialized whenever we see a new key
+    private var currentWriter: OutputWriter = _
+
+    private val bucketColumns: Seq[Attribute] = description.bucketSpec.toSeq.flatMap {
+      spec => spec.bucketColumnNames.map(c => description.allColumns.find(_.name ==
c).get)
+    }
+
+    private val sortColumns: Seq[Attribute] = description.bucketSpec.toSeq.flatMap {
+      spec => spec.sortColumnNames.map(c => description.allColumns.find(_.name == c).get)
+    }
+
+    private def bucketIdExpression: Option[Expression] = description.bucketSpec.map { spec
=>
+      // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that
we can
+      // guarantee the data distribution is same between shuffle and bucketed data source,
which
+      // enables us to only shuffle one side when join a bucketed table and a normal one.
+      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
+    }
+
+    /** Expressions that given a partition key build a string like: col1=val/col2=val/...
*/
+    private def partitionStringExpression: Seq[Expression] = {
+      description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+        val escaped = ScalaUDF(
+          PartitioningUtils.escapePathName _,
+          StringType,
+          Seq(Cast(c, StringType)),
+          Seq(StringType))
+        val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+        val partitionName = Literal(c.name + "=") :: str :: Nil
+        if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+      }
+    }
+
+    private def getBucketIdFromKey(key: InternalRow): Option[Int] =
+      description.bucketSpec.map { _ => key.getInt(description.partitionColumns.length)
}
+
+    /**
+     * Open and returns a new OutputWriter given a partition key and optional bucket id.
+     * If bucket id is specified, we will append it to the end of the file name, but before
the
+     * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
+     */
+    private def newOutputWriter(
+        key: InternalRow,
+        getPartitionString: UnsafeProjection): OutputWriter = {
+      val path =
+        if (description.partitionColumns.nonEmpty) {
+          val partitionPath = getPartitionString(key).getString(0)
+          new Path(stagingPath, partitionPath).toString
+        } else {
+          stagingPath
+        }
+      val bucketId = getBucketIdFromKey(key)
+
+      val newWriter = description.outputWriterFactory.newInstance(
+        path = path,
+        bucketId = bucketId,
+        dataSchema = description.nonPartitionColumns.toStructType,
+        context = taskAttemptContext)
+      newWriter.initConverter(description.nonPartitionColumns.toStructType)
+      newWriter
+    }
+
+    override def execute(iter: Iterator[InternalRow]): Unit = {
+      // We should first sort by partition columns, then bucket id, and finally sorting columns.
+      val sortingExpressions: Seq[Expression] =
+      description.partitionColumns ++ bucketIdExpression ++ sortColumns
+      val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
+
+      val sortingKeySchema = StructType(sortingExpressions.map {
+        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
+        // The sorting expressions are all `Attribute` except bucket id.
+        case _ => StructField("bucketId", IntegerType, nullable = false)
+      })
+
+      // Returns the data columns to be written given an input row
+      val getOutputRow = UnsafeProjection.create(
+        description.nonPartitionColumns, description.allColumns)
+
+      // Returns the partition path given a partition key.
+      val getPartitionString =
+      UnsafeProjection.create(Seq(Concat(partitionStringExpression)), description.partitionColumns)
+
+      // Sorts the data before write, so that we only need one writer at the same time.
+      val sorter = new UnsafeKVExternalSorter(
+        sortingKeySchema,
+        StructType.fromAttributes(description.nonPartitionColumns),
+        SparkEnv.get.blockManager,
+        SparkEnv.get.serializerManager,
+        TaskContext.get().taskMemoryManager().pageSizeBytes,
+        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
+
+      while (iter.hasNext) {
+        val currentRow = iter.next()
+        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
+      }
+      logInfo(s"Sorting complete. Writing out partition files one at a time.")
+
+      val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
+        identity
+      } else {
+        UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map
{
+          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
+        })
+      }
+
+      val sortedIterator = sorter.sortedIterator()
+
+      // If anything below fails, we should abort the task.
+      var currentKey: UnsafeRow = null
+      while (sortedIterator.next()) {
+        val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
+        if (currentKey != nextKey) {
+          if (currentWriter != null) {
+            currentWriter.close()
+            currentWriter = null
+          }
+          currentKey = nextKey.copy()
+          logDebug(s"Writing partition: $currentKey")
+
+          currentWriter = newOutputWriter(currentKey, getPartitionString)
+        }
+        currentWriter.writeInternal(sortedIterator.getValue)
+      }
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+    }
+
+    override def releaseResources(): Unit = {
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+    }
+  }
+
+  private def setupDriverCommitter(job: Job, path: String, isAppend: Boolean): OutputCommitter
= {
+    // Setup IDs
+    val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+    val taskId = new TaskID(jobId, TaskType.MAP, 0)
+    val taskAttemptId = new TaskAttemptID(taskId, 0)
+
+    // Set up the configuration object
+    job.getConfiguration.set("mapred.job.id", jobId.toString)
+    job.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+    job.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
+    job.getConfiguration.setBoolean("mapred.task.is.map", true)
+    job.getConfiguration.setInt("mapred.task.partition", 0)
+
+    // This UUID is sent to executor side together with the serialized `Configuration` object
within
+    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to
generate
+    // unique task output files.
+    // This UUID is used to avoid output file name collision between different appending
write jobs.
+    // These jobs may belong to different SparkContext instances. Concrete data source
+    // implementations may use this UUID to generate unique file names (e.g.,
+    // `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used
to identify a job
+    // rather than a single task output file is that, speculative tasks must generate the
same
+    // output file name as the original task.
+    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)
+
+    val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId)
+    val outputCommitter = newOutputCommitter(
+      job.getOutputFormatClass, taskAttemptContext, path, isAppend)
+    outputCommitter.setupJob(job)
+    outputCommitter
+  }
+
+  private def newOutputCommitter(
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      context: TaskAttemptContext,
+      path: String,
+      isAppend: Boolean): OutputCommitter = {
+    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+    if (isAppend) {
+      // If we are appending data to an existing dir, we will only use the output committer
+      // associated with the file output format since it is not safe to use a custom
+      // committer for appending. For example, in S3, direct parquet output committer may
+      // leave partial data in the destination dir when the appending job fails.
+      // See SPARK-8578 for more details
+      logInfo(
+        s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName}
" +
+          "for appending.")
+      defaultOutputCommitter
+    } else {
+      val configuration = context.getConfiguration
+      val clazz =
+        configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+      if (clazz != null) {
+        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output committer,
+        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs to set the
+        // output committer in prepareForWrite method.
+        if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+          ctor.newInstance(new Path(path), context)
+        } else {
+          // The specified output committer is just an OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          ctor.newInstance()
+        }
+      } else {
+        // If output committer class is not set, we will use the one associated with the
+        // file output format.
+        logInfo(
+          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+        defaultOutputCommitter
+      }
+    }
+  }
+}
+
+object WriterContainer {
+  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
deleted file mode 100644
index 253aa44..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.{Date, UUID}
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-
-
-/** A container for all the details required when writing to a table. */
-private[datasources] case class WriteRelation(
-    sparkSession: SparkSession,
-    dataSchema: StructType,
-    path: String,
-    prepareJobForWrite: Job => OutputWriterFactory,
-    bucketSpec: Option[BucketSpec])
-
-object WriterContainer {
-  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
-}
-
-private[datasources] abstract class BaseWriterContainer(
-    @transient val relation: WriteRelation,
-    @transient private val job: Job,
-    isAppend: Boolean)
-  extends Logging with Serializable {
-
-  protected val dataSchema = relation.dataSchema
-
-  protected val serializableConf =
-    new SerializableConfiguration(job.getConfiguration)
-
-  // This UUID is used to avoid output file name collision between different appending write
jobs.
-  // These jobs may belong to different SparkContext instances. Concrete data source implementations
-  // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
-  //  The reason why this ID is used to identify a job rather than a single task output file
is
-  // that, speculative tasks must generate the same output file name as the original task.
-  private val uniqueWriteJobId = UUID.randomUUID()
-
-  // This is only used on driver side.
-  @transient private val jobContext: JobContext = job
-
-  // The following fields are initialized and used on both driver and executor side.
-  @transient protected var outputCommitter: OutputCommitter = _
-  @transient private var jobId: JobID = _
-  @transient private var taskId: TaskID = _
-  @transient private var taskAttemptId: TaskAttemptID = _
-  @transient protected var taskAttemptContext: TaskAttemptContext = _
-
-  protected val outputPath: String = relation.path
-
-  protected var outputWriterFactory: OutputWriterFactory = _
-
-  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit
-
-  def driverSideSetup(): Unit = {
-    setupIDs(0, 0, 0)
-    setupConf()
-
-    // This UUID is sent to executor side together with the serialized `Configuration` object
within
-    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to
generate
-    // unique task output files.
-    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
-
-    // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
-    // clones the Configuration object passed in.  If we initialize the TaskAttemptContext
first,
-    // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
-    //
-    // Also, the `prepareJobForWrite` call must happen before initializing output format
and output
-    // committer, since their initialization involve the job configuration, which can be
potentially
-    // decorated in `prepareJobForWrite`.
-    outputWriterFactory = relation.prepareJobForWrite(job)
-    taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId)
-
-    outputFormatClass = job.getOutputFormatClass
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupJob(jobContext)
-  }
-
-  def executorSideSetup(taskContext: TaskContext): Unit = {
-    setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
-    setupConf()
-    taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId)
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupTask(taskAttemptContext)
-  }
-
-  protected def getWorkPath: String = {
-    outputCommitter match {
-      // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
-      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
-      case _ => outputPath
-    }
-  }
-
-  protected def newOutputWriter(path: String, bucketId: Option[Int] = None): OutputWriter
= {
-    try {
-      outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
-    } catch {
-      case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
-        if (outputCommitter.getClass.getName.contains("Direct")) {
-          // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
-          // attempts, the task will fail because the output file is created from a prior
attempt.
-          // This often means the most visible error to the user is misleading. Augment the
error
-          // to tell the user to look for the actual error.
-          throw new SparkException("The output file already exists but this could be due
to a " +
-            "failure from an earlier attempt. Look through the earlier logs or stage page
for " +
-            "the first error.\n  File exists error: " + e, e)
-        } else {
-          throw e
-        }
-    }
-  }
-
-  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-
-    if (isAppend) {
-      // If we are appending data to an existing dir, we will only use the output committer
-      // associated with the file output format since it is not safe to use a custom
-      // committer for appending. For example, in S3, direct parquet output committer may
-      // leave partial data in the destination dir when the appending job fails.
-      //
-      // See SPARK-8578 for more details
-      logInfo(
-        s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName}
" +
-          "for appending.")
-      defaultOutputCommitter
-    } else {
-      val configuration = context.getConfiguration
-      val committerClass = configuration.getClass(
-        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
-      Option(committerClass).map { clazz =>
-        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
-        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output committer,
-        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs to set the
-        // output committer in prepareForWrite method.
-        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-          ctor.newInstance(new Path(outputPath), context)
-        } else {
-          // The specified output committer is just an OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          ctor.newInstance()
-        }
-      }.getOrElse {
-        // If output committer class is not set, we will use the one associated with the
-        // file output format.
-        logInfo(
-          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
-        defaultOutputCommitter
-      }
-    }
-  }
-
-  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
-    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
-    this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId)
-    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
-  }
-
-  private def setupConf(): Unit = {
-    serializableConf.value.set("mapred.job.id", jobId.toString)
-    serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
-    serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
-    serializableConf.value.setBoolean("mapred.task.is.map", true)
-    serializableConf.value.setInt("mapred.task.partition", 0)
-  }
-
-  def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId)
-  }
-
-  def abortTask(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortTask(taskAttemptContext)
-    }
-    logError(s"Task attempt $taskAttemptId aborted.")
-  }
-
-  def commitJob(): Unit = {
-    outputCommitter.commitJob(jobContext)
-    logInfo(s"Job $jobId committed.")
-  }
-
-  def abortJob(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
-    }
-    logError(s"Job $jobId aborted.")
-  }
-}
-
-/**
- * A writer that writes all of the rows in a partition to a single file.
- */
-private[datasources] class DefaultWriterContainer(
-    relation: WriteRelation,
-    job: Job,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-    executorSideSetup(taskContext)
-    var writer = newOutputWriter(getWorkPath)
-    writer.initConverter(dataSchema)
-
-    // If anything below fails, we should abort the task.
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks {
-        while (iterator.hasNext) {
-          val internalRow = iterator.next()
-          writer.writeInternal(internalRow)
-        }
-        commitTask()
-      }(catchBlock = abortTask())
-    } catch {
-      case t: Throwable =>
-        throw new SparkException("Task failed while writing rows", t)
-    }
-
-    def commitTask(): Unit = {
-      try {
-        if (writer != null) {
-          writer.close()
-          writer = null
-        }
-        super.commitTask()
-      } catch {
-        case cause: Throwable =>
-          // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`,
and
-          // will cause `abortTask()` to be invoked.
-          throw new RuntimeException("Failed to commit task", cause)
-      }
-    }
-
-    def abortTask(): Unit = {
-      try {
-        if (writer != null) {
-          writer.close()
-        }
-      } finally {
-        super.abortTask()
-      }
-    }
-  }
-}
-
-/**
- * A writer that dynamically opens files based on the given partition columns.  Internally
this is
- * done by maintaining a HashMap of open files until `maxFiles` is reached.  If this occurs,
the
- * writer externally sorts the remaining rows and then writes out them out one file at a
time.
- */
-private[datasources] class DynamicPartitionWriterContainer(
-    relation: WriteRelation,
-    job: Job,
-    partitionColumns: Seq[Attribute],
-    dataColumns: Seq[Attribute],
-    inputSchema: Seq[Attribute],
-    defaultPartitionName: String,
-    maxOpenFiles: Int,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  private val bucketSpec = relation.bucketSpec
-
-  private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
-    spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
-  }
-
-  private val sortColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
-    spec => spec.sortColumnNames.map(c => inputSchema.find(_.name == c).get)
-  }
-
-  private def bucketIdExpression: Option[Expression] = bucketSpec.map { spec =>
-    // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that
we can
-    // guarantee the data distribution is same between shuffle and bucketed data source,
which
-    // enables us to only shuffle one side when join a bucketed table and a normal one.
-    HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
-  }
-
-  // Expressions that given a partition key build a string like: col1=val/col2=val/...
-  private def partitionStringExpression: Seq[Expression] = {
-    partitionColumns.zipWithIndex.flatMap { case (c, i) =>
-      val escaped =
-        ScalaUDF(
-          PartitioningUtils.escapePathName _,
-          StringType,
-          Seq(Cast(c, StringType)),
-          Seq(StringType))
-      val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
-      val partitionName = Literal(c.name + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
-    }
-  }
-
-  private def getBucketIdFromKey(key: InternalRow): Option[Int] = bucketSpec.map { _ =>
-    key.getInt(partitionColumns.length)
-  }
-
-  /**
-   * Open and returns a new OutputWriter given a partition key and optional bucket id.
-   * If bucket id is specified, we will append it to the end of the file name, but before
the
-   * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
-   */
-  private def newOutputWriter(
-      key: InternalRow,
-      getPartitionString: UnsafeProjection): OutputWriter = {
-    val path = if (partitionColumns.nonEmpty) {
-      val partitionPath = getPartitionString(key).getString(0)
-      new Path(getWorkPath, partitionPath).toString
-    } else {
-      getWorkPath
-    }
-    val bucketId = getBucketIdFromKey(key)
-    val newWriter = super.newOutputWriter(path, bucketId)
-    newWriter.initConverter(dataSchema)
-    newWriter
-  }
-
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-    executorSideSetup(taskContext)
-
-    // We should first sort by partition columns, then bucket id, and finally sorting columns.
-    val sortingExpressions: Seq[Expression] = partitionColumns ++ bucketIdExpression ++ sortColumns
-    val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema)
-
-    val sortingKeySchema = StructType(sortingExpressions.map {
-      case a: Attribute => StructField(a.name, a.dataType, a.nullable)
-      // The sorting expressions are all `Attribute` except bucket id.
-      case _ => StructField("bucketId", IntegerType, nullable = false)
-    })
-
-    // Returns the data columns to be written given an input row
-    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
-
-    // Returns the partition path given a partition key.
-    val getPartitionString =
-      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
-
-    // Sorts the data before write, so that we only need one writer at the same time.
-    // TODO: inject a local sort operator in planning.
-    val sorter = new UnsafeKVExternalSorter(
-      sortingKeySchema,
-      StructType.fromAttributes(dataColumns),
-      SparkEnv.get.blockManager,
-      SparkEnv.get.serializerManager,
-      TaskContext.get().taskMemoryManager().pageSizeBytes,
-      SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
-
-    while (iterator.hasNext) {
-      val currentRow = iterator.next()
-      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
-    }
-    logInfo(s"Sorting complete. Writing out partition files one at a time.")
-
-    val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
-      identity
-    } else {
-      UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map
{
-        case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
-      })
-    }
-
-    val sortedIterator = sorter.sortedIterator()
-
-    // If anything below fails, we should abort the task.
-    var currentWriter: OutputWriter = null
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks {
-        var currentKey: UnsafeRow = null
-        while (sortedIterator.next()) {
-          val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
-          if (currentKey != nextKey) {
-            if (currentWriter != null) {
-              currentWriter.close()
-              currentWriter = null
-            }
-            currentKey = nextKey.copy()
-            logDebug(s"Writing partition: $currentKey")
-
-            currentWriter = newOutputWriter(currentKey, getPartitionString)
-          }
-          currentWriter.writeInternal(sortedIterator.getValue)
-        }
-        if (currentWriter != null) {
-          currentWriter.close()
-          currentWriter = null
-        }
-
-        commitTask()
-      }(catchBlock = {
-        if (currentWriter != null) {
-          currentWriter.close()
-        }
-        abortTask()
-      })
-    } catch {
-      case t: Throwable =>
-        throw new SparkException("Task failed while writing rows", t)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8afd39d..9061b1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -339,13 +339,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val PARTITION_MAX_FILES =
-    SQLConfigBuilder("spark.sql.sources.maxConcurrentWrites")
-      .doc("The maximum number of concurrent files to open before falling back on sorting
when " +
-            "writing out files using dynamic partitioning.")
-      .intConf
-      .createWithDefault(1)
-
   val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
     .doc("When false, we will treat bucketed table as normal table")
     .booleanConf
@@ -733,8 +726,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
   def partitionColumnTypeInferenceEnabled: Boolean =
     getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
-  def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)
-
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message