spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/2] spark git commit: [SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API
Date Tue, 12 May 2015 17:32:41 GMT
Repository: spark
Updated Branches:
  refs/heads/master 831504cf6 -> 0595b6de8


http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index ca53dcd..5e010d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -17,11 +17,19 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.annotation.{Experimental, DeveloperApi}
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SaveMode, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
  * ::DeveloperApi::
@@ -78,6 +86,41 @@ trait SchemaRelationProvider {
       schema: StructType): BaseRelation
 }
 
+/**
+ * ::DeveloperApi::
+ * Implemented by objects that produce relations for a specific kind of data source
+ * with a given schema and partitioned columns.  When Spark SQL is given a DDL operation with a
+ * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
+ * schema, and an optional list of partition columns, this interface is used to pass in the
+ * parameters specified by a user.
+ *
+ * Users may specify the fully qualified class name of a given data source.  When that class is
+ * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
+ * less verbose invocation.  For example, 'org.apache.spark.sql.json' would resolve to the
+ * data source 'org.apache.spark.sql.json.DefaultSource'
+ *
+ * A new instance of this class with be instantiated each time a DDL call is made.
+ *
+ * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is
+ * that users need to provide a schema and a (possibly empty) list of partition columns when
+ * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
+ * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified
+ * schemas, and accessing partitioned relations.
+ */
+trait FSBasedRelationProvider {
+  /**
+   * Returns a new base relation with the given parameters, a user defined schema, and a list of
+   * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
+   * is enforced by the Map that is passed to the function.
+   */
+  def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): FSBasedRelation
+}
+
 @DeveloperApi
 trait CreatableRelationProvider {
   /**
@@ -207,3 +250,235 @@ trait InsertableRelation {
 trait CatalystScan {
   def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
 }
+
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the
+ * underlying file system.  Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
+ * executor side.  This instance is used to persist rows to this single output file.
+ */
+@Experimental
+abstract class OutputWriter {
+  /**
+   * Initializes this [[OutputWriter]] before any rows are persisted.
+   *
+   * @param path Path of the file to which this [[OutputWriter]] is supposed to write.  Note that
+   *        this may not point to the final output file.  For example, `FileOutputFormat` writes to
+   *        temporary directories and then merge written files back to the final destination.  In
+   *        this case, `path` points to a temporary output file under the temporary directory.
+   * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
+   *        schema if the corresponding relation is partitioned.
+   * @param context The Hadoop MapReduce task context.
+   */
+  def init(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): Unit = ()
+
+  /**
+   * Persists a single row.  Invoked on the executor side.  When writing to dynamically partitioned
+   * tables, dynamic partition columns are not included in rows to be written.
+   */
+  def write(row: Row): Unit
+
+  /**
+   * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
+   * the task output is committed.
+   */
+  def close(): Unit
+}
+
+/**
+ * ::Experimental::
+ * A [[BaseRelation]] that provides much of the common code required for formats that store their
+ * data to an HDFS compatible filesystem.
+ *
+ * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
+ * filter using selected predicates before producing an RDD containing all matching tuples as
+ * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
+ * systems, it's able to discover partitioning information from the paths of input directories, and
+ * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
+ * override one of the three `buildScan` methods to implement the read path.
+ *
+ * For the write path, it provides the ability to write to both non-partitioned and partitioned
+ * tables.  Directory layout of the partitioned tables is compatible with Hive.
+ *
+ * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for
+ *              implementing metastore table conversion.
+ * @param paths Base paths of this relation.  For partitioned relations, it should be the root
+ *        directories of all partition directories.
+ * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional
+ *        [[PartitionSpec]], so that partition discovery can be skipped.
+ */
+@Experimental
+abstract class FSBasedRelation private[sql](
+    val paths: Array[String],
+    maybePartitionSpec: Option[PartitionSpec])
+  extends BaseRelation {
+
+  /**
+   * Constructs an [[FSBasedRelation]].
+   *
+   * @param paths Base paths of this relation.  For partitioned relations, it should be either root
+   *        directories of all partition directories.
+   * @param partitionColumns Partition columns of this relation.
+   */
+  def this(paths: Array[String], partitionColumns: StructType) =
+    this(paths, {
+      if (partitionColumns.isEmpty) None
+      else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
+    })
+
+  /**
+   * Constructs an [[FSBasedRelation]].
+   *
+   * @param paths Base paths of this relation.  For partitioned relations, it should be root
+   *        directories of all partition directories.
+   */
+  def this(paths: Array[String]) = this(paths, None)
+
+  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+  private val codegenEnabled = sqlContext.conf.codegenEnabled
+
+  private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
+    spec.copy(partitionColumns = spec.partitionColumns.asNullable)
+  }.getOrElse {
+    if (sqlContext.conf.partitionDiscoveryEnabled()) {
+      discoverPartitions()
+    } else {
+      PartitionSpec(StructType(Nil), Array.empty[Partition])
+    }
+  }
+
+  private[sql] def partitionSpec: PartitionSpec = _partitionSpec
+
+  /**
+   * Partition columns. Note that they are always nullable.
+   */
+  def partitionColumns: StructType = partitionSpec.partitionColumns
+
+  private[sql] def refresh(): Unit = {
+    if (sqlContext.conf.partitionDiscoveryEnabled()) {
+      _partitionSpec = discoverPartitions()
+    }
+  }
+
+  private def discoverPartitions(): PartitionSpec = {
+    val basePaths = paths.map(new Path(_))
+    val leafDirs = basePaths.flatMap { path =>
+      val fs = path.getFileSystem(hadoopConf)
+      Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
+        .filter(_.isDir)
+        .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
+        .getOrElse(Seq.empty[FileStatus])
+    }.map(_.getPath)
+
+    if (leafDirs.nonEmpty) {
+      PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
+    } else {
+      PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
+    }
+  }
+
+  /**
+   * Schema of this relation.  It consists of columns appearing in [[dataSchema]] and all partition
+   * columns not appearing in [[dataSchema]].
+   */
+  override lazy val schema: StructType = {
+    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
+    StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column =>
+      dataSchemaColumnNames.contains(column.name.toLowerCase)
+    })
+  }
+
+  /**
+   * Specifies schema of actual data files.  For partitioned relations, if one or more partitioned
+   * columns are contained in the data files, they should also appear in `dataSchema`.
+   */
+  def dataSchema: StructType
+
+  /**
+   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
+   * this relation. For partitioned relations, this method is called for each selected partition,
+   * and builds an `RDD[Row]` containing all rows within that single partition.
+   *
+   * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+   *        relation. For a partitioned relation, it contains paths of all data files in a single
+   *        selected partition.
+   */
+  def buildScan(inputPaths: Array[String]): RDD[Row] = {
+    throw new RuntimeException(
+      "At least one buildScan() method should be overridden to read the relation.")
+  }
+
+  /**
+   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
+   * this relation. For partitioned relations, this method is called for each selected partition,
+   * and builds an `RDD[Row]` containing all rows within that single partition.
+   *
+   * @param requiredColumns Required columns.
+   * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+   *        relation. For a partitioned relation, it contains paths of all data files in a single
+   *        selected partition.
+   */
+  def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
+    // Yeah, to workaround serialization...
+    val dataSchema = this.dataSchema
+    val codegenEnabled = this.codegenEnabled
+
+    val requiredOutput = requiredColumns.map { col =>
+      val field = dataSchema(col)
+      BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
+    }.toSeq
+
+    buildScan(inputPaths).mapPartitions { rows =>
+      val buildProjection = if (codegenEnabled) {
+        GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
+      } else {
+        () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
+      }
+
+      val mutableProjection = buildProjection()
+      rows.map(mutableProjection)
+    }
+  }
+
+  /**
+   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
+   * this relation. For partitioned relations, this method is called for each selected partition,
+   * and builds an `RDD[Row]` containing all rows within that single partition.
+   *
+   * @param requiredColumns Required columns.
+   * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
+   *        of all `filters`.  The pushed down filters are currently purely an optimization as they
+   *        will all be evaluated again. This means it is safe to use them with methods that produce
+   *        false positives such as filtering partitions based on a bloom filter.
+   * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+   *        relation. For a partitioned relation, it contains paths of all data files in a single
+   *        selected partition.
+   */
+  def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[String]): RDD[Row] = {
+    buildScan(requiredColumns, inputPaths)
+  }
+
+  /**
+   * Client side preparation for data writing can be put here.  For example, user defined output
+   * committer can be configured here.
+   *
+   * Note that the only side effect expected here is mutating `job` via its setters.  Especially,
+   * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
+   * may cause unexpected behaviors.
+   */
+  def prepareForWrite(job: Job): Unit = ()
+
+  /**
+   * This method is responsible for producing a new [[OutputWriter]] for each newly opened output
+   * file on the executor side.
+   */
+  def outputWriterClass: Class[_ <: OutputWriter]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 6ed68d1..aad1d24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -101,13 +101,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
           }
         }
 
-      case i @ logical.InsertIntoTable(
-        l: LogicalRelation, partition, query, overwrite, ifNotExists)
-          if !l.isInstanceOf[InsertableRelation] =>
+      case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
+      case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
+      case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")
 
-      case CreateTableUsingAsSelect(tableName, _, _, SaveMode.Overwrite, _, query) =>
+      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table of
         // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
         if (catalog.tableExists(Seq(tableName))) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index b7561ce..bea568e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.parquet.ParquetRelation2._
+import org.apache.spark.sql.sources.PartitioningUtils._
+import org.apache.spark.sql.sources.{Partition, PartitionSpec}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, Row, SQLContext}

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 54f2f3c..4e54b2e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.sources
 
-import java.io.{IOException, File}
+import java.io.{File, IOException}
 
-import org.apache.spark.sql.AnalysisException
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.util.Utils
 
 class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbf48ef..d754c8e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,25 +19,24 @@ package org.apache.spark.sql.hive
 
 import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.metastore.Warehouse
+import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata._
 import org.apache.hadoop.hive.serde2.Deserializer
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Catalog, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
 import org.apache.spark.util.Utils
 
 /* Implicit conversions */
@@ -98,6 +97,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           ResolvedDataSource(
             hive,
             userSpecifiedSchema,
+            Array.empty[String],
             table.properties("spark.sql.sources.provider"),
             options)
 
@@ -438,6 +438,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
             desc.name,
             hive.conf.defaultDataSourceName,
             temporary = false,
+            Array.empty[String],
             mode,
             options = Map.empty[String, String],
             child

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index be9249a..d46a127 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -221,14 +221,14 @@ private[hive] trait HiveStrategies {
   object HiveDDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case CreateTableUsing(
-      tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
+          tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
         ExecutedCommand(
           CreateMetastoreDataSource(
             tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil
 
-      case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) =>
+      case CreateTableUsingAsSelect(tableName, provider, false, partitionCols, mode, opts, query) =>
         val cmd =
-          CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query)
+          CreateMetastoreDataSourceAsSelect(tableName, provider, partitionCols, mode, opts, query)
         ExecutedCommand(cmd) :: Nil
 
       case _ => Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index abab1a2..8e405e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -158,6 +158,7 @@ private[hive]
 case class CreateMetastoreDataSourceAsSelect(
     tableName: String,
     provider: String,
+    partitionColumns: Array[String],
     mode: SaveMode,
     options: Map[String, String],
     query: LogicalPlan) extends RunnableCommand {
@@ -189,12 +190,12 @@ case class CreateMetastoreDataSourceAsSelect(
           return Seq.empty[Row]
         case SaveMode.Append =>
           // Check if the specified data source match the data source of the existing table.
-          val resolved =
-            ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath)
+          val resolved = ResolvedDataSource(
+            sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
           EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
-            case l @ LogicalRelation(i: InsertableRelation) =>
-              if (i != createdRelation.relation) {
+            case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+              if (l.relation != createdRelation.relation) {
                 val errorDescription =
                   s"Cannot append to table $tableName because the resolved relation does not " +
                   s"match the existing relation of $tableName. " +
@@ -202,14 +203,13 @@ case class CreateMetastoreDataSourceAsSelect(
                   s"table $tableName and using its data source and options."
                 val errorMessage =
                   s"""
-                |$errorDescription
-                |== Relations ==
-                |${sideBySide(
-                s"== Expected Relation ==" ::
-                  l.toString :: Nil,
-                s"== Actual Relation ==" ::
-                  createdRelation.toString :: Nil).mkString("\n")}
-              """.stripMargin
+                     |$errorDescription
+                     |== Relations ==
+                     |${sideBySide(
+                        s"== Expected Relation ==" :: l.toString :: Nil,
+                        s"== Actual Relation ==" :: createdRelation.toString :: Nil
+                      ).mkString("\n")}
+                   """.stripMargin
                 throw new AnalysisException(errorMessage)
               }
               existingSchema = Some(l.schema)
@@ -234,7 +234,8 @@ case class CreateMetastoreDataSourceAsSelect(
     }
 
     // Create the relation based on the data of df.
-    val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
+    val resolved =
+      ResolvedDataSource(sqlContext, provider, partitionColumns, mode, optionsWithPath, df)
 
     if (createMetastoreTable) {
       // We will use the schema of resolved.relation as the schema of the table (instead of

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8398da2..cbc381c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -204,7 +204,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
           if (string == null || string.isEmpty) {
             defaultPartName
           } else {
-            FileUtils.escapePathName(string)
+            FileUtils.escapePathName(string, defaultPartName)
           }
         s"/$col=$colString"
       }.mkString

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
new file mode 100644
index 0000000..415b1cd
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.types._
+
+// TODO Don't extend ParquetTest
+// This test suite extends ParquetTest for some convenient utility methods. These methods should be
+// moved to some more general places, maybe QueryTest.
+class FSBasedRelationSuite extends QueryTest with ParquetTest {
+  override val sqlContext: SQLContext = TestHive
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  val dataSchema =
+    StructType(
+      Seq(
+        StructField("a", IntegerType, nullable = false),
+        StructField("b", StringType, nullable = false)))
+
+  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
+
+  val partitionedTestDF1 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF2 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
+
+  def checkQueries(df: DataFrame): Unit = {
+    // Selects everything
+    checkAnswer(
+      df,
+      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
+
+    // Simple filtering and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 === 2),
+      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2))
+
+    // Simple projection and filtering
+    checkAnswer(
+      df.filter('a > 1).select('b, 'a + 1),
+      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1))
+
+    // Simple projection and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
+      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
+
+    // Self-join
+    df.registerTempTable("t")
+    withTempTable("t") {
+      checkAnswer(
+        sql(
+          """SELECT l.a, r.b, l.p1, r.p2
+            |FROM t l JOIN t r
+            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
+          """.stripMargin),
+        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Overwrite") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite)
+
+      checkAnswer(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        testDF.collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Append") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Append)
+
+      checkAnswer(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)).orderBy("a"),
+        testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        testDF.save(
+          path = file.getCanonicalPath,
+          source = classOf[SimpleTextSource].getCanonicalName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Ignore") {
+    withTempDir { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  test("save()/load() - partitioned table - simple queries") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.ErrorIfExists,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkQueries(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)))
+    }
+  }
+
+  test("save()/load() - partitioned table - simple queries - partition columns in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+          .saveAsTextFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+      checkQueries(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+
+  test("save()/load() - partitioned table - Overwrite") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append - new partition values") {
+    withTempPath { file =>
+      partitionedTestDF1.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF2.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        partitionedTestDF.save(
+          source = classOf[SimpleTextSource].getCanonicalName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("path" -> file.getCanonicalPath),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("save()/load() - partitioned table - Ignore") {
+    withTempDir { file =>
+      partitionedTestDF.save(
+        path = file.getCanonicalPath,
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  def withTable(tableName: String)(f: => Unit): Unit = {
+    try f finally sql(s"DROP TABLE $tableName")
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Append") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite)
+
+    testDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Append)
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        testDF.saveAsTable(
+          tableName = "t",
+          source = classOf[SimpleTextSource].getCanonicalName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      testDF.saveAsTable(
+        tableName = "t",
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Ignore)
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - simple queries") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkQueries(table("t"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Overwrite") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - new partition values") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF2.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = classOf[SimpleTextSource].getCanonicalName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    // Using only a subset of all partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1"))
+    }
+
+    // Using different order of partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p2", "p1"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        partitionedTestDF.saveAsTable(
+          tableName = "t",
+          source = classOf[SimpleTextSource].getCanonicalName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("dataSchema" -> dataSchema.json),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      partitionedTestDF.saveAsTable(
+        tableName = "t",
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Ignore,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1", "p2"))
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("Hadoop style globbing") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      val df = load(
+        source = classOf[SimpleTextSource].getCanonicalName,
+        options = Map(
+          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
+          "dataSchema" -> dataSchema.json))
+
+      val expectedPaths = Set(
+        s"${file.getCanonicalFile}/p1=1/p2=foo",
+        s"${file.getCanonicalFile}/p1=2/p2=foo",
+        s"${file.getCanonicalFile}/p1=1/p2=bar",
+        s"${file.getCanonicalFile}/p1=2/p2=bar"
+      ).map { p =>
+        val path = new Path(p)
+        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+      }
+
+      println(df.queryExecution)
+
+      val actualPaths = df.queryExecution.analyzed.collectFirst {
+        case LogicalRelation(relation: FSBasedRelation) =>
+          relation.paths.toSet
+      }.getOrElse {
+        fail("Expect an FSBasedRelation, but none could be found")
+      }
+
+      assert(actualPaths === expectedPaths)
+      checkAnswer(df, partitionedTestDF.collect())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0595b6de/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
new file mode 100644
index 0000000..8801aba
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.text.NumberFormat
+import java.util.UUID
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.{Row, SQLContext}
+
+/**
+ * A simple example [[FSBasedRelationProvider]].
+ */
+class SimpleTextSource extends FSBasedRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): FSBasedRelation = {
+    val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
+    new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
+  }
+}
+
+class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
+  val numberFormat = NumberFormat.getInstance()
+
+  numberFormat.setMinimumIntegerDigits(5)
+  numberFormat.setGroupingUsed(false)
+
+  override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+    val split = context.getTaskAttemptID.getTaskID.getId
+    val name = FileOutputFormat.getOutputName(context)
+    new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+  }
+}
+
+class SimpleTextOutputWriter extends OutputWriter {
+  private var recordWriter: RecordWriter[NullWritable, Text] = _
+  private var taskAttemptContext: TaskAttemptContext = _
+
+  override def init(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): Unit = {
+    recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+    taskAttemptContext = context
+  }
+
+  override def write(row: Row): Unit = {
+    val serialized = row.toSeq.map(_.toString).mkString(",")
+    recordWriter.write(null, new Text(serialized))
+  }
+
+  override def close(): Unit = recordWriter.close(taskAttemptContext)
+}
+
+/**
+ * A simple example [[FSBasedRelation]], used for testing purposes.  Data are stored as comma
+ * separated string lines.  When scanning data, schema must be explicitly provided via data source
+ * option `"dataSchema"`.
+ */
+class SimpleTextRelation(
+    paths: Array[String],
+    val maybeDataSchema: Option[StructType],
+    partitionsSchema: StructType,
+    parameters: Map[String, String])(
+    @transient val sqlContext: SQLContext)
+  extends FSBasedRelation(paths, partitionsSchema) {
+
+  import sqlContext.sparkContext
+
+  override val dataSchema: StructType =
+    maybeDataSchema.getOrElse(DataType.fromJson(parameters("dataSchema")).asInstanceOf[StructType])
+
+  override def equals(other: Any): Boolean = other match {
+    case that: SimpleTextRelation =>
+      this.paths.sameElements(that.paths) &&
+        this.maybeDataSchema == that.maybeDataSchema &&
+        this.dataSchema == that.dataSchema &&
+        this.partitionColumns == that.partitionColumns
+
+    case _ => false
+  }
+
+  override def hashCode(): Int =
+    Objects.hashCode(paths, maybeDataSchema, dataSchema)
+
+  override def outputWriterClass: Class[_ <: OutputWriter] =
+    classOf[SimpleTextOutputWriter]
+
+  override def buildScan(inputPaths: Array[String]): RDD[Row] = {
+    val fields = dataSchema.map(_.dataType)
+
+    sparkContext.textFile(inputPaths.mkString(",")).map { record =>
+      Row(record.split(",").zip(fields).map { case (value, dataType) =>
+        Cast(Literal(value), dataType).eval()
+      }: _*)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message