spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-18021][SQL] Refactor file name specification for data sources
Date Thu, 20 Oct 2016 19:19:00 GMT
Repository: spark
Updated Branches:
  refs/heads/master 947f4f252 -> 7f9ec19ea


[SPARK-18021][SQL] Refactor file name specification for data sources

## What changes were proposed in this pull request?
Currently each data source OutputWriter is responsible for specifying the entire file name
for each file output. This, however, does not make any sense because we rely on file naming
schemes for certain behaviors in Spark SQL, e.g. bucket id. The current approach allows individual
data sources to break the implementation of bucketing.

On the flip side, we also don't want to move file naming entirely out of data sources, because
different data sources do want to specify different extensions.

This patch divides file name specification into two parts: the first part is a prefix specified
by the caller of OutputWriter (in WriteOutput), and the second part is the suffix that can
be specified by the OutputWriter itself. Note that a side effect of this change is that now
all file based data sources also support bucketing automatically.

There are also some other minor cleanups:

- Removed the UUID passed through generic Configuration string
- Some minor rewrites for better clarity
- Renamed "path" in multiple places to "stagingDir", to more accurately reflect its meaning

## How was this patch tested?
This should be covered by existing data source tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #15562 from rxin/SPARK-18021.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f9ec19e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f9ec19e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f9ec19e

Branch: refs/heads/master
Commit: 7f9ec19eae60abe589ffd22259a9065e7e353a57
Parents: 947f4f2
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Oct 20 12:18:56 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Oct 20 12:18:56 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala | 16 +++---
 .../execution/datasources/OutputWriter.scala    | 17 +++---
 .../sql/execution/datasources/WriteOutput.scala | 56 +++++++++-----------
 .../execution/datasources/csv/CSVRelation.scala | 18 +++----
 .../datasources/json/JsonFileFormat.scala       | 17 +++---
 .../datasources/parquet/ParquetFileFormat.scala |  7 ++-
 .../parquet/ParquetOutputWriter.scala           | 32 +++--------
 .../datasources/text/TextFileFormat.scala       | 21 ++++----
 .../spark/sql/hive/orc/OrcFileFormat.scala      | 21 ++++----
 .../spark/sql/sources/BucketedWriteSuite.scala  |  5 --
 .../sql/sources/CommitFailureTestSource.scala   |  6 +--
 .../spark/sql/sources/SimpleTextRelation.scala  | 26 +++++----
 12 files changed, 99 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 8577803..fff8668 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -40,7 +40,8 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
 private[libsvm] class LibSVMOutputWriter(
-    path: String,
+    stagingDir: String,
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter {
@@ -50,11 +51,7 @@ private[libsvm] class LibSVMOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path
= {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+        new Path(stagingDir, fileNamePrefix + extension)
       }
     }.getRecordWriter(context)
   }
@@ -132,12 +129,11 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with
DataSour
       dataSchema: StructType): OutputWriterFactory = {
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        if (bucketId.isDefined) { sys.error("LibSVM doesn't support bucketing") }
-        new LibSVMOutputWriter(path, dataSchema, context)
+        new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
index d2eec7b..f4cefda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
@@ -34,18 +34,23 @@ abstract class OutputWriterFactory extends Serializable {
    * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor
side
    * to instantiate new [[OutputWriter]]s.
    *
-   * @param path Path of the file to which this [[OutputWriter]] is supposed to write.  Note
that
-   *        this may not point to the final output file.  For example, `FileOutputFormat`
writes to
-   *        temporary directories and then merge written files back to the final destination.
 In
-   *        this case, `path` points to a temporary output file under the temporary directory.
+   * @param stagingDir Base path (directory) of the file to which this [[OutputWriter]] is
supposed
+   *                   to write.  Note that this may not point to the final output file.
 For
+   *                   example, `FileOutputFormat` writes to temporary directories and then
merge
+   *                   written files back to the final destination.  In this case, `path`
points to
+   *                   a temporary output file under the temporary directory.
+   * @param fileNamePrefix Prefix of the file name. The returned OutputWriter must make sure
this
+   *                       prefix is used in the actual file name. For example, if the prefix
is
+   *                       "part-1-2-3", then the file name must start with "part_1_2_3"
but can
+   *                       end in arbitrary extension.
    * @param dataSchema Schema of the rows to be written. Partition columns are not included
in the
    *        schema if the relation being written is partitioned.
    * @param context The Hadoop MapReduce task context.
    * @since 1.4.0
    */
   def newInstance(
-      path: String,
-      bucketId: Option[Int], // TODO: This doesn't belong here...
+      stagingDir: String,
+      fileNamePrefix: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
index 54d0f3b..bd56e51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
@@ -46,6 +46,7 @@ object WriteOutput extends Logging {
 
   /** A shared job description for all the write tasks. */
   private class WriteJobDescription(
+      val uuid: String,  // prevent collision between different (appending) write jobs
       val serializableHadoopConf: SerializableConfiguration,
       val outputWriterFactory: OutputWriterFactory,
       val allColumns: Seq[Attribute],
@@ -102,6 +103,7 @@ object WriteOutput extends Logging {
       fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
 
     val description = new WriteJobDescription(
+      uuid = UUID.randomUUID().toString,
       serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
       allColumns = plan.output,
@@ -213,6 +215,11 @@ object WriteOutput extends Logging {
   private trait ExecuteWriteTask {
     def execute(iterator: Iterator[InternalRow]): Unit
     def releaseResources(): Unit
+
+    final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = {
+      val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+      f"part-r-$split%05d-$uuid$bucketString"
+    }
   }
 
   /** Writes data to a single directory (used for non-dynamic-partition writes). */
@@ -222,9 +229,11 @@ object WriteOutput extends Logging {
       stagingPath: String) extends ExecuteWriteTask {
 
     private[this] var outputWriter: OutputWriter = {
+      val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
+
       val outputWriter = description.outputWriterFactory.newInstance(
-        path = stagingPath,
-        bucketId = None,
+        stagingDir = stagingPath,
+        fileNamePrefix = filePrefix(split, description.uuid, None),
         dataSchema = description.nonPartitionColumns.toStructType,
         context = taskAttemptContext)
       outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
@@ -287,29 +296,31 @@ object WriteOutput extends Logging {
       }
     }
 
-    private def getBucketIdFromKey(key: InternalRow): Option[Int] =
-      description.bucketSpec.map { _ => key.getInt(description.partitionColumns.length)
}
-
     /**
      * Open and returns a new OutputWriter given a partition key and optional bucket id.
      * If bucket id is specified, we will append it to the end of the file name, but before
the
      * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
      */
-    private def newOutputWriter(
-        key: InternalRow,
-        getPartitionString: UnsafeProjection): OutputWriter = {
+    private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter
= {
       val path =
         if (description.partitionColumns.nonEmpty) {
-          val partitionPath = getPartitionString(key).getString(0)
+          val partitionPath = partString(key).getString(0)
           new Path(stagingPath, partitionPath).toString
         } else {
           stagingPath
         }
-      val bucketId = getBucketIdFromKey(key)
 
+      // If the bucket spec is defined, the bucket column is right after the partition columns
+      val bucketId = if (description.bucketSpec.isDefined) {
+        Some(key.getInt(description.partitionColumns.length))
+      } else {
+        None
+      }
+
+      val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
       val newWriter = description.outputWriterFactory.newInstance(
-        path = path,
-        bucketId = bucketId,
+        stagingDir = path,
+        fileNamePrefix = filePrefix(split, description.uuid, bucketId),
         dataSchema = description.nonPartitionColumns.toStructType,
         context = taskAttemptContext)
       newWriter.initConverter(description.nonPartitionColumns.toStructType)
@@ -319,7 +330,7 @@ object WriteOutput extends Logging {
     override def execute(iter: Iterator[InternalRow]): Unit = {
       // We should first sort by partition columns, then bucket id, and finally sorting columns.
       val sortingExpressions: Seq[Expression] =
-      description.partitionColumns ++ bucketIdExpression ++ sortColumns
+        description.partitionColumns ++ bucketIdExpression ++ sortColumns
       val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
 
       val sortingKeySchema = StructType(sortingExpressions.map {
@@ -333,8 +344,8 @@ object WriteOutput extends Logging {
         description.nonPartitionColumns, description.allColumns)
 
       // Returns the partition path given a partition key.
-      val getPartitionString =
-      UnsafeProjection.create(Seq(Concat(partitionStringExpression)), description.partitionColumns)
+      val getPartitionString = UnsafeProjection.create(
+        Seq(Concat(partitionStringExpression)), description.partitionColumns)
 
       // Sorts the data before write, so that we only need one writer at the same time.
       val sorter = new UnsafeKVExternalSorter(
@@ -405,17 +416,6 @@ object WriteOutput extends Logging {
     job.getConfiguration.setBoolean("mapred.task.is.map", true)
     job.getConfiguration.setInt("mapred.task.partition", 0)
 
-    // This UUID is sent to executor side together with the serialized `Configuration` object
within
-    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to
generate
-    // unique task output files.
-    // This UUID is used to avoid output file name collision between different appending
write jobs.
-    // These jobs may belong to different SparkContext instances. Concrete data source
-    // implementations may use this UUID to generate unique file names (e.g.,
-    // `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used
to identify a job
-    // rather than a single task output file is that, speculative tasks must generate the
same
-    // output file name as the original task.
-    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)
-
     val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId)
     val outputCommitter = newOutputCommitter(
       job.getOutputFormatClass, taskAttemptContext, path, isAppend)
@@ -474,7 +474,3 @@ object WriteOutput extends Logging {
     }
   }
 }
-
-object WriterContainer {
-  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 55cb26d..eefacbf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile,
WriterContainer}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.types._
 
 object CSVRelation extends Logging {
@@ -170,17 +170,17 @@ object CSVRelation extends Logging {
 
 private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory
{
   override def newInstance(
-      path: String,
-      bucketId: Option[Int],
+      stagingDir: String,
+      fileNamePrefix: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter = {
-    if (bucketId.isDefined) sys.error("csv doesn't support bucketing")
-    new CsvOutputWriter(path, dataSchema, context, params)
+    new CsvOutputWriter(stagingDir, fileNamePrefix, dataSchema, context, params)
   }
 }
 
 private[csv] class CsvOutputWriter(
-    path: String,
+    stagingDir: String,
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext,
     params: CSVOptions) extends OutputWriter with Logging {
@@ -199,11 +199,7 @@ private[csv] class CsvOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path
= {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
+        new Path(stagingDir, s"$fileNamePrefix.csv$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 9fe38cc..cdbb2f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -82,11 +82,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister
{
 
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
+        new JsonOutputWriter(stagingDir, parsedOptions, fileNamePrefix, dataSchema, context)
       }
     }
   }
@@ -153,9 +153,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister
{
 }
 
 private[json] class JsonOutputWriter(
-    path: String,
+    stagingDir: String,
     options: JSONOptions,
-    bucketId: Option[Int],
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter with Logging {
@@ -168,12 +168,7 @@ private[json] class JsonOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path
= {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
+        new Path(stagingDir, s"$fileNamePrefix.json$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6faafed..87b944b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -27,7 +27,7 @@ import scala.util.{Failure, Try}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.{Log => ApacheParquetLog}
 import org.apache.parquet.filter2.compat.FilterCompat
@@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -134,10 +133,10 @@ class ParquetFileFormat
     new OutputWriterFactory {
       override def newInstance(
           path: String,
-          bucketId: Option[Int],
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new ParquetOutputWriter(path, bucketId, context)
+        new ParquetOutputWriter(path, fileNamePrefix, context)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
index f89ce05..39c1997 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory,
WriterContainer}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -122,13 +122,12 @@ private[parquet] class ParquetOutputWriterFactory(
   }
 
   /** Disable the use of the older API. */
-  def newInstance(
+  override def newInstance(
       path: String,
-      bucketId: Option[Int],
+      fileNamePrefix: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter = {
-    throw new UnsupportedOperationException(
-      "this version of newInstance not supported for " +
+    throw new UnsupportedOperationException("this version of newInstance not supported for
" +
         "ParquetOutputWriterFactory")
   }
 }
@@ -136,33 +135,16 @@ private[parquet] class ParquetOutputWriterFactory(
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
 private[parquet] class ParquetOutputWriter(
-    path: String,
-    bucketId: Option[Int],
+    stagingDir: String,
+    fileNamePrefix: String,
     context: TaskAttemptContext)
   extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, InternalRow] = {
     val outputFormat = {
       new ParquetOutputFormat[InternalRow]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate unique output file names to avoid
-        //     overwriting existing files (either exist before the write job, or are just
written
-        //     by other tasks within the same write job).
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of
all
-        //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String):
Path = {
-          val configuration = context.getConfiguration
-          val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-          val taskAttemptId = context.getTaskAttemptID
-          val split = taskAttemptId.getTaskID.getId
-          val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-          // It has the `.parquet` extension at the end because (de)compression tools
-          // such as gunzip would not be able to decompress this as the compression
-          // is not applied on this whole file but on each "page" in Parquet format.
-          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
+          new Path(stagingDir, fileNamePrefix + extension)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 9f96667..6cd2351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -73,14 +73,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister
{
 
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        if (bucketId.isDefined) {
-          throw new AnalysisException("Text doesn't support bucketing")
-        }
-        new TextOutputWriter(path, dataSchema, context)
+        new TextOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
       }
     }
   }
@@ -124,7 +121,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister
{
   }
 }
 
-class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
+class TextOutputWriter(
+    stagingDir: String,
+    fileNamePrefix: String,
+    dataSchema: StructType,
+    context: TaskAttemptContext)
   extends OutputWriter {
 
   private[this] val buffer = new Text()
@@ -132,11 +133,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context:
TaskAttemp
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path
= {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
+        new Path(stagingDir, s"$fileNamePrefix.txt$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 1af3280..1ceacb4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -83,11 +83,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
 
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new OrcOutputWriter(path, bucketId, dataSchema, context)
+        new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
       }
     }
   }
@@ -210,8 +210,8 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
 }
 
 private[orc] class OrcOutputWriter(
-    path: String,
-    bucketId: Option[Int],
+    stagingDir: String,
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter {
@@ -226,10 +226,7 @@ private[orc] class OrcOutputWriter(
 
   private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
     recordWriterInstantiated = true
-    val uniqueWriteJobId = conf.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-    val taskAttemptId = context.getTaskAttemptID
-    val partition = taskAttemptId.getTaskID.getId
-    val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+
     val compressionExtension = {
       val name = conf.get(OrcRelation.ORC_COMPRESSION)
       OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
@@ -237,12 +234,12 @@ private[orc] class OrcOutputWriter(
     // It has the `.orc` extension at the end because (de)compression tools
     // such as gunzip would not be able to decompress this as the compression
     // is not applied on this whole file but on each "stream" in ORC format.
-    val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
+    val filename = s"$fileNamePrefix$compressionExtension.orc"
 
     new OrcOutputFormat().getRecordWriter(
-      new Path(path, filename).getFileSystem(conf),
+      new Path(stagingDir, filename).getFileSystem(conf),
       conf.asInstanceOf[JobConf],
-      new Path(path, filename).toString,
+      new Path(stagingDir, filename).toString,
       Reporter.NULL
     ).asInstanceOf[RecordWriter[NullWritable, Writable]]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 9974451..2eafe18 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -54,11 +54,6 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
   }
 
-  test("write bucketed data to unsupported data source") {
-    val df = Seq(Tuple1("a"), Tuple1("b")).toDF("i")
-    intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
-  }
-
   test("write bucketed data using save()") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
index 5a8a7f0..d504468 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -39,11 +39,11 @@ class CommitFailureTestSource extends SimpleTextSource {
       dataSchema: StructType): OutputWriterFactory =
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new SimpleTextOutputWriter(path, context) {
+        new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) {
           var failed = false
           TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
             failed = true

http://git-wip-us.apache.org/repos/asf/spark/blob/7f9ec19e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 906de6b..9e13b21 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
 import org.apache.spark.sql.{sources, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
@@ -51,11 +51,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister
{
     SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new SimpleTextOutputWriter(path, context)
+        new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context)
       }
     }
   }
@@ -120,9 +120,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister
{
   }
 }
 
-class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter
{
+class SimpleTextOutputWriter(
+    stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext)
+  extends OutputWriter {
   private val recordWriter: RecordWriter[NullWritable, Text] =
-    new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+    new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context)
 
   override def write(row: Row): Unit = {
     val serialized = row.toSeq.map { v =>
@@ -136,19 +138,15 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext)
extends
   }
 }
 
-class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable,
Text] {
-  val numberFormat = NumberFormat.getInstance()
+class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
+  extends TextOutputFormat[NullWritable, Text] {
 
+  val numberFormat = NumberFormat.getInstance()
   numberFormat.setMinimumIntegerDigits(5)
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path =
{
-    val configuration = context.getConfiguration
-    val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-    val taskAttemptId = context.getTaskAttemptID
-    val split = taskAttemptId.getTaskID.getId
-    val name = FileOutputFormat.getOutputName(context)
-    new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
+    new Path(stagingDir, fileNamePrefix)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message