spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liancheng <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-5182] [SQL] Partitioning support for th...
Date Sun, 10 May 2015 03:53:48 GMT
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5526#discussion_r29998597
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -207,3 +246,218 @@ trait InsertableRelation {
     trait CatalystScan {
       def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
     }
    +
    +/**
    + * ::Experimental::
    + * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to
the
    + * underlying file system.  Subclasses of [[OutputWriter]] must provide a zero-argument
constructor.
    + * An [[OutputWriter]] instance is created and initialized when a new output file is
opened on
    + * executor side.  This instance is used to persist rows to this single output file.
    + */
    +@Experimental
    +abstract class OutputWriter {
    +  /**
    +   * Initializes this [[OutputWriter]] before any rows are persisted.
    +   *
    +   * @param path Path of the file to which this [[OutputWriter]] is supposed to write.
 Note that
    +   *        this may not point to the final output file.  For example, `FileOutputFormat`
writes to
    +   *        temporary directories and then merge written files back to the final destination.
 In
    +   *        this case, `path` points to a temporary output file under the temporary directory.
    +   * @param dataSchema Schema of the rows to be written. Partition columns are not included
in the
    +   *        schema if the corresponding relation is partitioned.
    +   * @param context The Hadoop MapReduce task context.
    +   */
    +  def init(
    +      path: String,
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): Unit = ()
    +
    +  /**
    +   * Persists a single row.  Invoked on the executor side.  When writing to dynamically
partitioned
    +   * tables, dynamic partition columns are not included in rows to be written.
    +   */
    +  def write(row: Row): Unit
    +
    +  /**
    +   * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted,
before
    +   * the task output is committed.
    +   */
    +  def close(): Unit = ()
    +}
    +
    +/**
    + * ::Experimental::
    + * A [[BaseRelation]] that abstracts file system based data sources.
    + *
    + * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns
and
    + * filter using selected predicates before producing an RDD containing all matching tuples
as
    + * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored
in file
    + * systems, it's able to discover partitioning information from the paths of input directories,
and
    + * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]]
must
    + * override one of the three `buildScan` methods to implement the read path.
    + *
    + * For the write path, it provides the ability to write to both non-partitioned and partitioned
    + * tables.  Directory layout of the partitioned tables is compatible with Hive.
    + *
    + * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument
is for
    + *              implementing metastore table conversion.
    + * @param paths Base paths of this relation.  For partitioned relations, it should be
the root
    + *        directories of all partition directories.
    + * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional
    + *        [[PartitionSpec]], so that partition discovery can be skipped.
    + */
    +@Experimental
    +abstract class FSBasedRelation private[sql](
    +    val paths: Array[String],
    +    maybePartitionSpec: Option[PartitionSpec])
    +  extends BaseRelation {
    +
    +  /**
    +   * Constructs an [[FSBasedRelation]].
    +   *
    +   * @param paths Base paths of this relation.  For partitioned relations, it should
be either root
    +   *        directories of all partition directories.
    +   * @param partitionColumns Partition columns of this relation.
    +   */
    +  def this(paths: Array[String], partitionColumns: StructType) =
    +    this(paths, {
    +      if (partitionColumns.isEmpty) None
    +      else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
    +    })
    +
    +  /**
    +   * Constructs an [[FSBasedRelation]].
    +   *
    +   * @param paths Base paths of this relation.  For partitioned relations, it should
be root
    +   *        directories of all partition directories.
    +   */
    +  def this(paths: Array[String]) = this(paths, None)
    +
    +  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
    +
    +  private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
    +    spec.copy(partitionColumns = spec.partitionColumns.asNullable)
    +  }.getOrElse {
    +    if (partitionDiscoverEnabled()) {
    +      discoverPartitions()
    +    } else {
    +      PartitionSpec(StructType(Nil), Array.empty[Partition])
    +    }
    +  }
    +
    +  private def partitionDiscoverEnabled() =
    +    sqlContext.conf.getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
    +
    +  private[sql] def partitionSpec: PartitionSpec = _partitionSpec
    +
    +  /**
    +   * Partition columns. Note that they are always nullable.
    +   */
    +  def partitionColumns: StructType = partitionSpec.partitionColumns
    +
    +  private[sql] def refresh(): Unit = {
    +    if (partitionDiscoverEnabled()) {
    +      _partitionSpec = discoverPartitions()
    +    }
    +  }
    +
    +  private def discoverPartitions(): PartitionSpec = {
    +    val basePaths = paths.map(new Path(_))
    +    val leafDirs = basePaths.flatMap { path =>
    +      val fs = path.getFileSystem(hadoopConf)
    +      if (fs.exists(path)) {
    +        SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
    +      } else {
    +        Seq.empty[FileStatus]
    +      }
    +    }.map(_.getPath)
    +
    +    if (leafDirs.nonEmpty) {
    +      PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
    +    } else {
    +      PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
    +    }
    +  }
    +
    +  /**
    +   * Schema of this relation.  It consists of columns appearing in [[dataSchema]] and
all partition
    +   * columns not appearing in [[dataSchema]].
    +   */
    +  override lazy val schema: StructType = {
    +    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
    +    StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column =>
    +      dataSchemaColumnNames.contains(column.name.toLowerCase)
    +    })
    +  }
    +
    +  /**
    +   * Specifies schema of actual data files.  For partitioned relations, if one or more
partitioned
    +   * columns are contained in the data files, they should also appear in `dataSchema`.
    +   */
    +  def dataSchema: StructType
    +
    +  /**
    +   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all
rows within
    +   * this relation. For partitioned relations, this method is called for each selected
partition,
    +   * and builds an `RDD[Row]` containing all rows within that single partition.
    +   *
    +   * @param inputPaths For a non-partitioned relation, it contains paths of all data
files in the
    +   *        relation. For a partitioned relation, it contains paths of all data files
in a single
    +   *        selected partition.
    +   */
    +  def buildScan(inputPaths: Array[String]): RDD[Row] = {
    +    throw new RuntimeException(
    +      "At least one buildScan() method should be overridden to read the relation.")
    +  }
    +
    +  /**
    +   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all
rows within
    +   * this relation. For partitioned relations, this method is called for each selected
partition,
    +   * and builds an `RDD[Row]` containing all rows within that single partition.
    +   *
    +   * @param requiredColumns Required columns.
    +   * @param inputPaths For a non-partitioned relation, it contains paths of all data
files in the
    +   *        relation. For a partitioned relation, it contains paths of all data files
in a single
    +   *        selected partition.
    +   */
    +  def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row]
= {
    +    buildScan(inputPaths)
    --- End diff --
    
    The projection is done in `DataSourceStrategies`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message