spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-15103][SQL] Refactored FileCatalog class to allow StreamFileCatalog to infer partitioning
Date Wed, 04 May 2016 18:03:22 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c59615432 -> e868a15a7


[SPARK-15103][SQL] Refactored FileCatalog class to allow StreamFileCatalog to infer partitioning

## What changes were proposed in this pull request?

File Stream Sink writes the list of written files in a metadata log. StreamFileCatalog reads
the list of the files for processing. However StreamFileCatalog does not infer partitioning
like HDFSFileCatalog.

This PR enables that by refactoring HDFSFileCatalog to create an abstract class PartitioningAwareFileCatalog,
that has all the functionality to infer partitions from a list of leaf files.
- HDFSFileCatalog has been renamed to ListingFileCatalog and it extends PartitioningAwareFileCatalog
by providing a list of leaf files from recursive directory scanning.
- StreamFileCatalog has been renamed to MetadataLogFileCatalog and it extends PartitioningAwareFileCatalog
by providing a list of leaf files from the metadata log.
- The above two classes has been moved into their own files as they are not interfaces that
should be in fileSourceInterfaces.scala.

## How was this patch tested?
- FileStreamSinkSuite was update to see if partitioning gets inferred, and on reading whether
the partitions get pruned correctly based on the query.
- Other unit tests are unchanged and pass as expected.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #12879 from tdas/SPARK-15103.

(cherry picked from commit 0fd3a4748416233f034ec137d95f0a4c8712d396)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e868a15a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e868a15a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e868a15a

Branch: refs/heads/branch-2.0
Commit: e868a15a7a3ce8895092131f45110c27b734bfb7
Parents: c596154
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed May 4 11:02:48 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed May 4 11:03:17 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |   8 +-
 .../datasources/ListingFileCatalog.scala        | 127 +++++++++++
 .../PartitioningAwareFileCatalog.scala          | 155 +++++++++++++
 .../datasources/fileSourceInterfaces.scala      | 215 +------------------
 .../streaming/MetadataLogFileCatalog.scala      |  59 +++++
 .../execution/streaming/StreamFileCatalog.scala |  58 -----
 .../sql/streaming/FileStreamSinkSuite.scala     |  64 +++++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   9 +-
 8 files changed, 410 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6114142..618ea3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -136,7 +136,7 @@ case class DataSource(
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
-      val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths,
None)
+      val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
       format.inferSchema(
         sparkSession,
         caseInsensitiveOptions,
@@ -258,7 +258,7 @@ case class DataSource(
       case (format: FileFormat, _)
           if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
-        val fileCatalog = new StreamFileCatalog(sparkSession, basePath)
+        val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
         val dataSchema = userSpecifiedSchema.orElse {
           format.inferSchema(
             sparkSession,
@@ -310,8 +310,8 @@ case class DataSource(
             })
         }
 
-        val fileCatalog: FileCatalog =
-          new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema)
+        val fileCatalog =
+          new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema)
 
         val dataSchema = userSpecifiedSchema.map { schema =>
           val equality =

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
new file mode 100644
index 0000000..bdf43e0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[FileCatalog]] that generates the list of files to process by recursively listing all
the
+ * files present in `paths`.
+ *
+ * @param parameters as set of options to control discovery
+ * @param paths a list of paths to scan
+ * @param partitionSchema an optional partition schema that will be use to provide types
for the
+ *                        discovered partitions
+ */
+class ListingFileCatalog(
+    sparkSession: SparkSession,
+    override val paths: Seq[Path],
+    parameters: Map[String, String],
+    partitionSchema: Option[StructType])
+  extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+      cachedPartitionSpec = inferPartitioning()
+    }
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+    cachedLeafFiles
+  }
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+    cachedLeafDirToChildrenFiles
+  }
+
+  override def refresh(): Unit = {
+    val files = listLeafFiles(paths)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath ->
f)
+    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
+    cachedPartitionSpec = null
+  }
+
+  protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+    if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
{
+      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
+    } else {
+      val statuses: Seq[FileStatus] = paths.flatMap { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        logInfo(s"Listing $path on driver")
+        // Dummy jobconf to get to the pathFilter defined in configuration
+        val jobConf = new JobConf(hadoopConf, this.getClass)
+        val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
+        val statuses = {
+          val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
+          if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else
stats
+        }
+
+        statuses.map {
+          case f: LocatedFileStatus => f
+
+          // NOTE:
+          //
+          // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+          //   operations, calling `getFileBlockLocations` does no harm here since these
file system
+          //   implementations don't actually issue RPC for this method.
+          //
+          // - Here we are calling `getFileBlockLocations` in a sequential manner, but it
should a
+          //   a big deal since we always use to `listLeafFilesInParallel` when the number
of paths
+          //   exceeds threshold.
+          case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
+        }
+      }.filterNot { status =>
+        val name = status.getPath.getName
+        HadoopFsRelation.shouldFilterOut(name)
+      }
+
+      val (dirs, files) = statuses.partition(_.isDirectory)
+
+      // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
+      if (dirs.isEmpty) {
+        mutable.LinkedHashSet(files: _*)
+      } else {
+        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+      }
+    }
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = paths.toSet.hashCode()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
new file mode 100644
index 0000000..9d997d6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StringType, StructType}
+
+
+/**
+ * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
+ * It provides the necessary methods to parse partition data based on a set of files.
+ *
+ * @param parameters as set of options to control partition discovery
+ * @param partitionSchema an optional partition schema that will be use to provide types
for the
+ *                        discovered partitions
+*/
+abstract class PartitioningAwareFileCatalog(
+    sparkSession: SparkSession,
+    parameters: Map[String, String],
+    partitionSchema: Option[StructType])
+  extends FileCatalog with Logging {
+
+  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+
+  protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
+
+  protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+    if (partitionSpec().partitionColumns.isEmpty) {
+      Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_"))
:: Nil
+    } else {
+      prunePartitions(filters, partitionSpec()).map {
+        case PartitionDirectory(values, path) =>
+          Partition(
+            values,
+            leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
+      }
+    }
+  }
+
+  override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
+
+  protected def inferPartitioning(): PartitionSpec = {
+    // We use leaf dirs containing data files to discover the schema.
+    val leafDirs = leafDirToChildrenFiles.keys.toSeq
+    partitionSchema match {
+      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+        val spec = PartitioningUtils.parsePartitions(
+          leafDirs,
+          PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = false,
+          basePaths = basePaths)
+
+        // Without auto inference, all of value in the `row` should be null or in StringType,
+        // we need to cast into the data type that user specified.
+        def castPartitionValuesToUserSchema(row: InternalRow) = {
+          InternalRow((0 until row.numFields).map { i =>
+            Cast(
+              Literal.create(row.getUTF8String(i), StringType),
+              userProvidedSchema.fields(i).dataType).eval()
+          }: _*)
+        }
+
+        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+          part.copy(values = castPartitionValuesToUserSchema(part.values))
+        })
+      case _ =>
+        PartitioningUtils.parsePartitions(
+          leafDirs,
+          PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
+          basePaths = basePaths)
+    }
+  }
+
+  private def prunePartitions(
+      predicates: Seq[Expression],
+      partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
+    val PartitionSpec(partitionColumns, partitions) = partitionSpec
+    val partitionColumnNames = partitionColumns.map(_.name).toSet
+    val partitionPruningPredicates = predicates.filter {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+
+    if (partitionPruningPredicates.nonEmpty) {
+      val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+      val boundPredicate = InterpretedPredicate.create(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionColumns.indexWhere(a.name == _.name)
+          BoundReference(index, partitionColumns(index).dataType, nullable = true)
+      })
+
+      val selected = partitions.filter {
+        case PartitionDirectory(values, _) => boundPredicate(values)
+      }
+      logInfo {
+        val total = partitions.length
+        val selectedSize = selected.length
+        val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
+        s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
+      }
+
+      selected
+    } else {
+      partitions
+    }
+  }
+
+  /**
+   * Contains a set of paths that are considered as the base dirs of the input datasets.
+   * The partitioning discovery logic will make sure it will stop when it reaches any
+   * base path. By default, the paths of the dataset provided by users will be base paths.
+   * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the
base path
+   * will be `/path/something=true/`, and the returned DataFrame will not contain a column
of
+   * `something`. If users want to override the basePath. They can set `basePath` in the
options
+   * to pass the new base path to the data source.
+   * For the above example, if the user-provided base path is `/path/`, the returned
+   * DataFrame will have the column of `something`.
+   */
+  private def basePaths: Set[Path] = {
+    val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
+    userDefinedBasePath.getOrElse {
+      // If the user does not provide basePath, we will just use paths.
+      paths.toSet
+    }.map { hdfsPath =>
+      // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+      val fs = hdfsPath.getFileSystem(hadoopConf)
+      hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 24e2bf6..c87e672 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -304,232 +304,31 @@ case class Partition(values: InternalRow, files: Seq[FileStatus])
  * as the partitioning characteristics of those files.
  */
 trait FileCatalog {
+
+  /** Returns the list of input paths from which the catalog will get files. */
   def paths: Seq[Path]
 
+  /** Returns the specification of the partitions inferred from the data. */
   def partitionSpec(): PartitionSpec
 
   /**
    * Returns all valid files grouped into partitions when the data is partitioned. If the
data is
-   * unpartitioned, this will return a single partition with not partition values.
+   * unpartitioned, this will return a single partition with no partition values.
    *
-   * @param filters the filters used to prune which partitions are returned.  These filters
must
+   * @param filters The filters used to prune which partitions are returned.  These filters
must
    *                only refer to partition columns and this method will only return files
    *                where these predicates are guaranteed to evaluate to `true`.  Thus, these
    *                filters will not need to be evaluated again on the returned data.
    */
   def listFiles(filters: Seq[Expression]): Seq[Partition]
 
+  /** Returns all the valid files. */
   def allFiles(): Seq[FileStatus]
 
-  def getStatus(path: Path): Array[FileStatus]
-
+  /** Refresh the file listing */
   def refresh(): Unit
 }
 
-/**
- * A file catalog that caches metadata gathered by scanning all the files present in `paths`
- * recursively.
- *
- * @param parameters as set of options to control discovery
- * @param paths a list of paths to scan
- * @param partitionSchema an optional partition schema that will be use to provide types
for the
- *                        discovered partitions
- */
-class HDFSFileCatalog(
-    sparkSession: SparkSession,
-    parameters: Map[String, String],
-    override val paths: Seq[Path],
-    partitionSchema: Option[StructType])
-  extends FileCatalog with Logging {
-
-  private val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
-
-  var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
-  var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
-  var cachedPartitionSpec: PartitionSpec = _
-
-  def partitionSpec(): PartitionSpec = {
-    if (cachedPartitionSpec == null) {
-      cachedPartitionSpec = inferPartitioning(partitionSchema)
-    }
-
-    cachedPartitionSpec
-  }
-
-  refresh()
-
-  override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
-    if (partitionSpec().partitionColumns.isEmpty) {
-      Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_"))
:: Nil
-    } else {
-      prunePartitions(filters, partitionSpec()).map {
-        case PartitionDirectory(values, path) =>
-          Partition(
-            values,
-            getStatus(path).filterNot(_.getPath.getName startsWith "_"))
-      }
-    }
-  }
-
-  protected def prunePartitions(
-      predicates: Seq[Expression],
-      partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
-    val PartitionSpec(partitionColumns, partitions) = partitionSpec
-    val partitionColumnNames = partitionColumns.map(_.name).toSet
-    val partitionPruningPredicates = predicates.filter {
-      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-    }
-
-    if (partitionPruningPredicates.nonEmpty) {
-      val predicate = partitionPruningPredicates.reduce(expressions.And)
-
-      val boundPredicate = InterpretedPredicate.create(predicate.transform {
-        case a: AttributeReference =>
-          val index = partitionColumns.indexWhere(a.name == _.name)
-          BoundReference(index, partitionColumns(index).dataType, nullable = true)
-      })
-
-      val selected = partitions.filter {
-        case PartitionDirectory(values, _) => boundPredicate(values)
-      }
-      logInfo {
-        val total = partitions.length
-        val selectedSize = selected.length
-        val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
-        s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
-      }
-
-      selected
-    } else {
-      partitions
-    }
-  }
-
-  def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
-
-  def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path)
-
-  private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
{
-      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
-    } else {
-      val statuses: Seq[FileStatus] = paths.flatMap { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        logInfo(s"Listing $path on driver")
-        // Dummy jobconf to get to the pathFilter defined in configuration
-        val jobConf = new JobConf(hadoopConf, this.getClass)
-        val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-
-        val statuses = {
-          val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-          if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else
stats
-        }
-
-        statuses.map {
-          case f: LocatedFileStatus => f
-
-          // NOTE:
-          //
-          // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
-          //   operations, calling `getFileBlockLocations` does no harm here since these
file system
-          //   implementations don't actually issue RPC for this method.
-          //
-          // - Here we are calling `getFileBlockLocations` in a sequential manner, but it
should a
-          //   a big deal since we always use to `listLeafFilesInParallel` when the number
of paths
-          //   exceeds threshold.
-          case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
-        }
-      }.filterNot { status =>
-        val name = status.getPath.getName
-        HadoopFsRelation.shouldFilterOut(name)
-      }
-
-      val (dirs, files) = statuses.partition(_.isDirectory)
-
-      // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
-      if (dirs.isEmpty) {
-        mutable.LinkedHashSet(files: _*)
-      } else {
-        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
-      }
-    }
-  }
-
-  def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
-    // We use leaf dirs containing data files to discover the schema.
-    val leafDirs = leafDirToChildrenFiles.keys.toSeq
-    schema match {
-      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
-        val spec = PartitioningUtils.parsePartitions(
-          leafDirs,
-          PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = false,
-          basePaths = basePaths)
-
-        // Without auto inference, all of value in the `row` should be null or in StringType,
-        // we need to cast into the data type that user specified.
-        def castPartitionValuesToUserSchema(row: InternalRow) = {
-          InternalRow((0 until row.numFields).map { i =>
-            Cast(
-              Literal.create(row.getUTF8String(i), StringType),
-              userProvidedSchema.fields(i).dataType).eval()
-          }: _*)
-        }
-
-        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
-          part.copy(values = castPartitionValuesToUserSchema(part.values))
-        })
-      case _ =>
-        PartitioningUtils.parsePartitions(
-          leafDirs,
-          PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
-          basePaths = basePaths)
-    }
-  }
-
-  /**
-   * Contains a set of paths that are considered as the base dirs of the input datasets.
-   * The partitioning discovery logic will make sure it will stop when it reaches any
-   * base path. By default, the paths of the dataset provided by users will be base paths.
-   * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the
base path
-   * will be `/path/something=true/`, and the returned DataFrame will not contain a column
of
-   * `something`. If users want to override the basePath. They can set `basePath` in the
options
-   * to pass the new base path to the data source.
-   * For the above example, if the user-provided base path is `/path/`, the returned
-   * DataFrame will have the column of `something`.
-   */
-  private def basePaths: Set[Path] = {
-    val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
-    userDefinedBasePath.getOrElse {
-      // If the user does not provide basePath, we will just use paths.
-      paths.toSet
-    }.map { hdfsPath =>
-      // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
-      val fs = hdfsPath.getFileSystem(hadoopConf)
-      hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-    }
-  }
-
-  def refresh(): Unit = {
-    val files = listLeafFiles(paths)
-
-    leafFiles.clear()
-    leafDirToChildrenFiles.clear()
-
-    leafFiles ++= files.map(f => f.getPath -> f)
-    leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
-
-    cachedPartitionSpec = null
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet
-    case _ => false
-  }
-
-  override def hashCode(): Int = paths.toSet.hashCode()
-}
 
 /**
  * Helper methods for gathering metadata from HDFS.

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
new file mode 100644
index 0000000..20ade12
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources._
+
+
+/**
+ * A [[FileCatalog]] that generates the list of files to processing by reading them from
the
+ * metadata log files generated by the [[FileStreamSink]].
+ */
+class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
+  extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) {
+
+  private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
+  logInfo(s"Reading streaming file log from $metadataDirectory")
+  private val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString)
+  private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
+  private var cachedPartitionSpec: PartitionSpec = _
+
+  override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+    new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f)
+  }
+
+  override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+    allFilesFromLog.toArray.groupBy(_.getPath.getParent)
+  }
+
+  override def paths: Seq[Path] = path :: Nil
+
+  override def refresh(): Unit = { }
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+      cachedPartitionSpec = inferPartitioning()
+    }
+    cachedPartitionSpec
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
deleted file mode 100644
index 4f69971..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.{FileCatalog, Partition, PartitionSpec}
-import org.apache.spark.sql.types.StructType
-
-class StreamFileCatalog(sparkSession: SparkSession, path: Path) extends FileCatalog with
Logging {
-  val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
-  logInfo(s"Reading streaming file log from $metadataDirectory")
-  val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString)
-  val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
-
-  override def paths: Seq[Path] = path :: Nil
-
-  override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil)
-
-  /**
-   * Returns all valid files grouped into partitions when the data is partitioned. If the
data is
-   * unpartitioned, this will return a single partition with not partition values.
-   *
-   * @param filters the filters used to prune which partitions are returned.  These filters
must
-   *                only refer to partition columns and this method will only return files
-   *                where these predicates are guaranteed to evaluate to `true`.  Thus, these
-   *                filters will not need to be evaluated again on the returned data.
-   */
-  override def listFiles(filters: Seq[Expression]): Seq[Partition] =
-    Partition(InternalRow.empty, allFiles()) :: Nil
-
-  override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path)
-
-  override def refresh(): Unit = {}
-
-  override def allFiles(): Seq[FileStatus] = {
-    metadataLog.allFiles().map(_.toFileStatus)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 609ca97..e937fc3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -22,12 +22,14 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter}
 
-import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.execution.datasources.parquet
-import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream}
+import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
@@ -147,7 +149,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
     }
   }
 
-  test("FileStreamSink - partitioned writing and batch reading [IGNORES PARTITION COLUMN]")
{
+  test("FileStreamSink - partitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val ds = inputData.toDS()
 
@@ -157,7 +159,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
     var query: ContinuousQuery = null
 
     try {
-       query =
+      query =
         ds.map(i => (i, i * 1000))
           .toDF("id", "value")
           .write
@@ -171,12 +173,58 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
         query.processAllAvailable()
       }
 
-      // TODO (tdas): Test partition column can be read or not
       val outputDf = sqlContext.read.parquet(outputDir)
+      val expectedSchema = new StructType()
+        .add(StructField("value", IntegerType))
+        .add(StructField("id", IntegerType))
+      assert(outputDf.schema === expectedSchema)
+
+      // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema
has
+      // been inferred
+      val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
+        case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation]
=>
+          baseRelation.asInstanceOf[HadoopFsRelation]
+      }
+      assert(hadoopdFsRelations.size === 1)
+      assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog])
+      assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
+      assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
+
+      // Verify the data is correctly read
       checkDataset(
-        outputDf.as[Int],
-        1000, 2000, 3000)
+        outputDf.as[(Int, Int)],
+        (1000, 1), (2000, 2), (3000, 3))
+
+      /** Check some condition on the partitions of the FileScanRDD generated by a DF */
+      def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit
= {
+        val getFileScanRDD = df.queryExecution.executedPlan.collect {
+          case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
+            scan.rdd.asInstanceOf[FileScanRDD]
+        }.headOption.getOrElse {
+          fail(s"No FileScan in query\n${df.queryExecution}")
+        }
+        func(getFileScanRDD.filePartitions)
+      }
 
+      // Read without pruning
+      checkFileScanPartitions(outputDf) { partitions =>
+        // There should be as many distinct partition values as there are distinct ids
+        assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
+      }
+
+      // Read with pruning, should read only files in partition dir id=1
+      checkFileScanPartitions(outputDf.filter("id = 1")) { partitions =>
+        val filesToBeRead = partitions.flatMap(_.files)
+        assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
+        assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
+      }
+
+      // Read with pruning, should read only files in partition dir id=1 and id=2
+      checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions =>
+        val filesToBeRead = partitions.flatMap(_.files)
+        assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
+        assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
+      }
     } finally {
       if (query != null) {
         query.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/e868a15a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0b24d35..7a799b6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -536,17 +536,12 @@ private[hive] class MetaStoreFileCatalog(
     sparkSession: SparkSession,
     paths: Seq[Path],
     partitionSpecFromHive: PartitionSpec)
-  extends HDFSFileCatalog(
+  extends ListingFileCatalog(
     sparkSession,
-    Map.empty,
     paths,
+    Map.empty,
     Some(partitionSpecFromHive.partitionColumns)) {
 
-  override def getStatus(path: Path): Array[FileStatus] = {
-    val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
-    fs.listStatus(path)
-  }
-
   override def partitionSpec(): PartitionSpec = partitionSpecFromHive
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message