spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-8125] [SQL] Accelerates Parquet schema merging and partition discovery
Date Mon, 20 Jul 2015 23:43:12 GMT
Repository: spark
Updated Branches:
  refs/heads/master dac7dbf5a -> a1064df0e


[SPARK-8125] [SQL] Accelerates Parquet schema merging and partition discovery

This PR tries to accelerate Parquet schema discovery and `HadoopFsRelation` partition discovery.
 The acceleration is done by the following means:

- Turning off schema merging by default

  Schema merging is not the most common case, but requires reading footers of all Parquet
part-files and can be very slow.

- Avoiding `FileSystem.globStatus()` call when possible

  `FileSystem.globStatus()` may issue multiple synchronous RPC calls, and can be very slow
(esp. on S3).  This PR adds `SparkHadoopUtil.globPathIfNecessary()`, which only issues RPC
calls when the path contain glob-pattern specific character(s) (`{}[]*?\`).

  This is especially useful when converting a metastore Parquet table with lots of partitions,
since Spark SQL adds all partition directories as the input paths, and currently we do a `globStatus`
call on each input path sequentially.

- Listing leaf files in parallel when the number of input paths exceeds a threshold

  Listing leaf files is required by partition discovery.  Currently it is done on driver side,
and can be slow when there are lots of (nested) directories, since each `FileSystem.listStatus()`
call issues an RPC.  In this PR, we list leaf files in a BFS style, and resort to a Spark
job once we found that the number of directories need to be listed exceed a threshold.

  The threshold is controlled by `SQLConf` option `spark.sql.sources.parallelPartitionDiscovery.threshold`,
which defaults to 32.

- Discovering Parquet schema in parallel

  Currently, schema merging is also done on driver side, and needs to read footers of all
part-files.  This PR uses a Spark job to do schema merging.  Together with task side metadata
reading in Parquet 1.7.0, we never read any footers on driver side now.

Author: Cheng Lian <lian@databricks.com>

Closes #7396 from liancheng/accel-parquet and squashes the following commits:

5598efc [Cheng Lian] Uses ParquetInputFormat[InternalRow] instead of ParquetInputFormat[Row]
ff32cd0 [Cheng Lian] Excludes directories while listing leaf files
3c580f1 [Cheng Lian] Fixes test failure caused by making "mergeSchema" default to "false"
b1646aa [Cheng Lian] Should allow empty input paths
32e5f0d [Cheng Lian] Moves schema merging to executor side


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1064df0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1064df0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1064df0

Branch: refs/heads/master
Commit: a1064df0ee3daf496800be84293345a10e1497d9
Parents: dac7dbf
Author: Cheng Lian <lian@databricks.com>
Authored: Mon Jul 20 16:42:43 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Mon Jul 20 16:42:43 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |   8 +
 .../org/apache/spark/sql/DataFrameReader.scala  |  12 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  10 +-
 .../sql/parquet/ParquetTableOperations.scala    |  14 +-
 .../apache/spark/sql/parquet/newParquet.scala   | 158 +++++++++++++------
 .../org/apache/spark/sql/sources/ddl.scala      |   8 +-
 .../apache/spark/sql/sources/interfaces.scala   | 120 +++++++++++---
 .../ParquetPartitionDiscoverySuite.scala        |  18 ++-
 8 files changed, 258 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6b14d40..e06b06e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -239,6 +239,14 @@ class SparkHadoopUtil extends Logging {
     }.getOrElse(Seq.empty[Path])
   }
 
+  def globPathIfNecessary(pattern: Path): Seq[Path] = {
+    if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
+      globPath(pattern)
+    } else {
+      Seq(pattern)
+    }
+  }
+
   /**
    * Lists all the files in a directory with the specified prefix, and does not end with
the
    * given suffix. The returned {{FileStatus}} instances are sorted by the modification times
of

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9b23df4..0e37ad3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import java.util.Properties
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark.Partition
+import org.apache.spark.{Logging, Partition}
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType
  * @since 1.4.0
  */
 @Experimental
-class DataFrameReader private[sql](sqlContext: SQLContext) {
+class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
 
   /**
    * Specifies the input data source format.
@@ -251,7 +251,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
     if (paths.isEmpty) {
       sqlContext.emptyDataFrame
     } else {
-      val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
+      val globbedPaths = paths.flatMap { path =>
+        val hdfsPath = new Path(path)
+        val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        SparkHadoopUtil.get.globPathIfNecessary(qualified)
+      }.toArray
+
       sqlContext.baseRelationToDataFrame(
         new ParquetRelation2(
           globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 84d3271..78c780b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -242,7 +242,7 @@ private[spark] object SQLConf {
     doc = "Whether the query analyzer should be case sensitive or not.")
 
   val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
-    defaultValue = Some(true),
+    defaultValue = Some(false),
     doc = "When true, the Parquet data source merges schemas collected from all data files,
" +
           "otherwise the schema is picked from the summary file or a random data file " +
           "if no summary file is available.")
@@ -376,6 +376,11 @@ private[spark] object SQLConf {
   val OUTPUT_COMMITTER_CLASS =
     stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
 
+  val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
+    key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
+    defaultValue = Some(32),
+    doc = "<TODO>")
+
   // Whether to perform eager analysis when constructing a dataframe.
   // Set to false when debugging requires the ability to look at invalid query plans.
   val DATAFRAME_EAGER_ANALYSIS = booleanConf(
@@ -495,6 +500,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
   private[spark] def partitionColumnTypeInferenceEnabled(): Boolean =
     getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
+  private[spark] def parallelPartitionDiscoveryThreshold: Int =
+    getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
+
   // Do not use a value larger than 4000 as the default value of this property.
   // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
   private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 9058b09..28cba5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -426,6 +426,7 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
   }
 }
 
+// TODO Removes this class after removing old Parquet support code
 /**
  * We extend ParquetInputFormat in order to have more control over which
  * RecordFilter we want to use.
@@ -433,8 +434,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
 private[parquet] class FilteringParquetRowInputFormat
   extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with Logging {
 
-  private var fileStatuses = Map.empty[Path, FileStatus]
-
   override def createRecordReader(
       inputSplit: InputSplit,
       taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = {
@@ -455,17 +454,6 @@ private[parquet] class FilteringParquetRowInputFormat
 
 }
 
-private[parquet] object FilteringParquetRowInputFormat {
-  private val footerCache = CacheBuilder.newBuilder()
-    .maximumSize(20000)
-    .build[FileStatus, Footer]()
-
-  private val blockLocationCache = CacheBuilder.newBuilder()
-    .maximumSize(20000)
-    .expireAfterWrite(15, TimeUnit.MINUTES)  // Expire locations since HDFS files might move
-    .build[FileStatus, Array[BlockLocation]]()
-}
-
 private[parquet] object FileSystemHelper {
   def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
     val origPath = new Path(pathStr)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 01dd6f4..e683eb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -22,7 +22,7 @@ import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import scala.util.Try
+import scala.util.{Failure, Try}
 
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -31,12 +31,11 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName}
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.RDD._
 import org.apache.spark.sql._
@@ -278,19 +277,13 @@ private[sql] class ParquetRelation2(
     // Create the function to set input paths at the driver side.
     val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
 
-    val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
-
     Utils.withDummyCallSite(sqlContext.sparkContext) {
-      // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
-      // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus`
objects
-      // and footers. Especially when a global arbitrative schema (either from metastore
or data
-      // source DDL) is available.
       new SqlNewHadoopRDD(
         sc = sqlContext.sparkContext,
         broadcastedConf = broadcastedConf,
         initDriverSideJobFuncOpt = Some(setInputPaths),
         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
-        inputFormatClass = classOf[FilteringParquetRowInputFormat],
+        inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
         keyClass = classOf[Void],
         valueClass = classOf[InternalRow]) {
 
@@ -306,12 +299,6 @@ private[sql] class ParquetRelation2(
             f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
         }.toSeq
 
-        @transient val cachedFooters = footers.map { f =>
-          // In order to encode the authority of a Path containing special characters such
as /,
-          // we need to use the string returned by the URI of the path to create a new Path.
-          new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
-        }.toSeq
-
         private def escapePathUserInfo(path: Path): Path = {
           val uri = path.toUri
           new Path(new URI(
@@ -321,13 +308,10 @@ private[sql] class ParquetRelation2(
 
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
-          val inputFormat = if (cacheMetadata) {
-            new FilteringParquetRowInputFormat {
-              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
-              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+          val inputFormat = new ParquetInputFormat[InternalRow] {
+            override def listStatus(jobContext: JobContext): JList[FileStatus] = {
+              if (cacheMetadata) cachedStatuses else super.listStatus(jobContext)
             }
-          } else {
-            new FilteringParquetRowInputFormat
           }
 
           val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
@@ -348,9 +332,6 @@ private[sql] class ParquetRelation2(
     // `FileStatus` objects of all "_common_metadata" files.
     private var commonMetadataStatuses: Array[FileStatus] = _
 
-    // Parquet footer cache.
-    var footers: Map[Path, Footer] = _
-
     // `FileStatus` objects of all data files (Parquet part-files).
     var dataStatuses: Array[FileStatus] = _
 
@@ -376,20 +357,6 @@ private[sql] class ParquetRelation2(
       commonMetadataStatuses =
         leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
 
-      footers = {
-        val conf = SparkHadoopUtil.get.conf
-        val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
-        val rawFooters = if (shouldMergeSchemas) {
-          ParquetFileReader.readAllFootersInParallel(
-            conf, seqAsJavaList(leaves), taskSideMetaData)
-        } else {
-          ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
-            conf, seqAsJavaList(leaves), taskSideMetaData)
-        }
-
-        rawFooters.map(footer => footer.getFile -> footer).toMap
-      }
-
       // If we already get the schema, don't need to re-compute it since the schema merging
is
       // time-consuming.
       if (dataSchema == null) {
@@ -422,7 +389,7 @@ private[sql] class ParquetRelation2(
       // Always tries the summary files first if users don't require a merged schema.  In
this case,
       // "_common_metadata" is more preferable than "_metadata" because it doesn't contain
row
       // groups information, and could be much smaller for large Parquet files with lots
of row
-      // groups.
+      // groups.  If no summary file is available, falls back to some random part-file.
       //
       // NOTE: Metadata stored in the summary files are merged from all part-files.  However,
for
       // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't
know
@@ -457,10 +424,10 @@ private[sql] class ParquetRelation2(
 
       assert(
         filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
-        "No schema defined, " +
-          s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
+        "No predefined schema found, " +
+          s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
 
-      ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
+      ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
     }
   }
 }
@@ -519,6 +486,7 @@ private[sql] object ParquetRelation2 extends Logging {
   private[parquet] def initializeDriverSideJobFunc(
       inputFiles: Array[FileStatus])(job: Job): Unit = {
     // We side the input paths at the driver side.
+    logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
     if (inputFiles.nonEmpty) {
       FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
     }
@@ -543,7 +511,7 @@ private[sql] object ParquetRelation2 extends Logging {
         .getKeyValueMetaData
         .toMap
         .get(RowReadSupport.SPARK_METADATA_KEY)
-      if (serializedSchema == None) {
+      if (serializedSchema.isEmpty) {
         // Falls back to Parquet schema if no Spark SQL schema found.
         Some(parseParquetSchema(metadata.getSchema))
       } else if (!seen.contains(serializedSchema.get)) {
@@ -646,4 +614,106 @@ private[sql] object ParquetRelation2 extends Logging {
       .filter(_.nullable)
     StructType(parquetSchema ++ missingFields)
   }
+
+  /**
+   * Figures out a merged Parquet schema with a distributed Spark job.
+   *
+   * Note that locality is not taken into consideration here because:
+   *
+   *  1. For a single Parquet part-file, in most cases the footer only resides in the last
block of
+   *     that file.  Thus we only need to retrieve the location of the last block.  However,
Hadoop
+   *     `FileSystem` only provides API to retrieve locations of all blocks, which can be
+   *     potentially expensive.
+   *
+   *  2. This optimization is mainly useful for S3, where file metadata operations can be
pretty
+   *     slow.  And basically locality is not available when using S3 (you can't run computation
on
+   *     S3 nodes).
+   */
+  def mergeSchemasInParallel(
+      filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
+    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
+    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+    val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+
+    // HACK ALERT:
+    //
+    // Parquet requires `FileStatus`es to read footers.  Here we try to send cached `FileStatus`es
+    // to executor side to avoid fetching them again.  However, `FileStatus` is not `Serializable`
+    // but only `Writable`.  What makes it worth, for some reason, `FileStatus` doesn't play
well
+    // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`.
 These
+    // facts virtually prevents us to serialize `FileStatus`es.
+    //
+    // Since Parquet only relies on path and length information of those `FileStatus`es to
read
+    // footers, here we just extract them (which can be easily serialized), send them to
executor
+    // side, and resemble fake `FileStatus`es there.
+    val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
+
+    // Issues a Spark job to read Parquet schema in parallel.
+    val partiallyMergedSchemas =
+      sqlContext
+        .sparkContext
+        .parallelize(partialFileStatusInfo)
+        .mapPartitions { iterator =>
+          // Resembles fake `FileStatus`es with serialized path and length information.
+          val fakeFileStatuses = iterator.map { case (path, length) =>
+            new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
+          }.toSeq
+
+          // Skips row group information since we only need the schema
+          val skipRowGroups = true
+
+          // Reads footers in multi-threaded manner within each task
+          val footers =
+            ParquetFileReader.readAllFootersInParallel(
+              serializedConf.value, fakeFileStatuses, skipRowGroups)
+
+          // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
+          val converter =
+            new CatalystSchemaConverter(
+              assumeBinaryIsString = assumeBinaryIsString,
+              assumeInt96IsTimestamp = assumeInt96IsTimestamp,
+              followParquetFormatSpec = followParquetFormatSpec)
+
+          footers.map { footer =>
+            ParquetRelation2.readSchemaFromFooter(footer, converter)
+          }.reduceOption(_ merge _).iterator
+        }.collect()
+
+    partiallyMergedSchemas.reduceOption(_ merge _)
+  }
+
+  /**
+   * Reads Spark SQL schema from a Parquet footer.  If a valid serialized Spark SQL schema
string
+   * can be found in the file metadata, returns the deserialized [[StructType]], otherwise,
returns
+   * a [[StructType]] converted from the [[MessageType]] stored in this footer.
+   */
+  def readSchemaFromFooter(
+      footer: Footer, converter: CatalystSchemaConverter): StructType = {
+    val fileMetaData = footer.getParquetMetadata.getFileMetaData
+    fileMetaData
+      .getKeyValueMetaData
+      .toMap
+      .get(RowReadSupport.SPARK_METADATA_KEY)
+      .flatMap(deserializeSchemaString)
+      .getOrElse(converter.convert(fileMetaData.getSchema))
+  }
+
+  private def deserializeSchemaString(schemaString: String): Option[StructType] = {
+    // Tries to deserialize the schema string as JSON first, then falls back to the case
class
+    // string parser (data generated by older versions of Spark SQL uses this format).
+    Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
+      case _: Throwable =>
+        logInfo(
+          s"Serialized Spark schema in Parquet key-value metadata is not in JSON format,
" +
+            "falling back to the deprecated DataType.fromCaseClassString parser.")
+        DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
+    }.recoverWith {
+      case cause: Throwable =>
+        logWarning(
+          "Failed to parse and ignored serialized Spark schema in " +
+            s"Parquet key-value metadata:\n\t$schemaString", cause)
+        Failure(cause)
+    }.toOption
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index d7440c5..5a8c97c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -247,7 +247,9 @@ private[sql] object ResolvedDataSource {
           val caseInsensitiveOptions = new CaseInsensitiveMap(options)
           val paths = {
             val patternPath = new Path(caseInsensitiveOptions("path"))
-            SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
+            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
           }
 
           val dataSchema =
@@ -272,7 +274,9 @@ private[sql] object ResolvedDataSource {
           val caseInsensitiveOptions = new CaseInsensitiveMap(options)
           val paths = {
             val patternPath = new Path(caseInsensitiveOptions("path"))
-            SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
+            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
           }
           dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
         case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 5d7cc2f..2cd8b35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -18,21 +18,23 @@
 package org.apache.spark.sql.sources
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
+import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.RDDConversions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.execution.RDDConversions
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql._
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -367,7 +369,9 @@ abstract class OutputWriter {
  */
 @Experimental
 abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
-  extends BaseRelation {
+  extends BaseRelation with Logging {
+
+  logInfo("Constructing HadoopFsRelation")
 
   def this() = this(None)
 
@@ -382,36 +386,40 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
 
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
-    def refresh(): Unit = {
-      // We don't filter files/directories whose name start with "_" except "_temporary"
here, as
-      // specific data sources may take advantages over them (e.g. Parquet _metadata and
-      // _common_metadata files). "_temporary" directories are explicitly ignored since failed
-      // tasks/jobs may leave partial/corrupted data files there.
-      def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
-        if (status.getPath.getName.toLowerCase == "_temporary") {
-          Set.empty
+    private def listLeafFiles(paths: Array[String]): Set[FileStatus] = {
+      if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
+        HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
+      } else {
+        val statuses = paths.flatMap { path =>
+          val hdfsPath = new Path(path)
+          val fs = hdfsPath.getFileSystem(hadoopConf)
+          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+
+          logInfo(s"Listing $qualified on driver")
+          Try(fs.listStatus(qualified)).getOrElse(Array.empty)
+        }.filterNot { status =>
+          val name = status.getPath.getName
+          name.toLowerCase == "_temporary" || name.startsWith(".")
+        }
+
+        val (dirs, files) = statuses.partition(_.isDir)
+
+        if (dirs.isEmpty) {
+          files.toSet
         } else {
-          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
-          val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
-          files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+          files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString))
         }
       }
+    }
 
-      leafFiles.clear()
+    def refresh(): Unit = {
+      val files = listLeafFiles(paths)
 
-      val statuses = paths.flatMap { path =>
-        val hdfsPath = new Path(path)
-        val fs = hdfsPath.getFileSystem(hadoopConf)
-        val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs,
_))
-      }.filterNot { status =>
-        // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories
-        status.getPath.getName.startsWith(".")
-      }
+      leafFiles.clear()
+      leafDirToChildrenFiles.clear()
 
-      val files = statuses.filterNot(_.isDir)
       leafFiles ++= files.map(f => f.getPath -> f).toMap
-      leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
+      leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
     }
   }
 
@@ -666,3 +674,63 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
    */
   def prepareJobForWrite(job: Job): OutputWriterFactory
 }
+
+private[sql] object HadoopFsRelation extends Logging {
+  // We don't filter files/directories whose name start with "_" except "_temporary" here,
as
+  // specific data sources may take advantages over them (e.g. Parquet _metadata and
+  // _common_metadata files). "_temporary" directories are explicitly ignored since failed
+  // tasks/jobs may leave partial/corrupted data files there.  Files and directories whose
name
+  // start with "." are also ignored.
+  def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
+    logInfo(s"Listing ${status.getPath}")
+    val name = status.getPath.getName.toLowerCase
+    if (name == "_temporary" || name.startsWith(".")) {
+      Array.empty
+    } else {
+      val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+      files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+    }
+  }
+
+  // `FileStatus` is Writable but not serializable.  What make it worse, somehow it doesn't
play
+  // well with `SerializableWritable`.  So there seems to be no way to serialize a `FileStatus`.
+  // Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize
it from
+  // executor side and reconstruct it on driver side.
+  case class FakeFileStatus(
+      path: String,
+      length: Long,
+      isDir: Boolean,
+      blockReplication: Short,
+      blockSize: Long,
+      modificationTime: Long,
+      accessTime: Long)
+
+  def listLeafFilesInParallel(
+      paths: Array[String],
+      hadoopConf: Configuration,
+      sparkContext: SparkContext): Set[FileStatus] = {
+    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val fakeStatuses = sparkContext.parallelize(paths).flatMap { path =>
+      val hdfsPath = new Path(path)
+      val fs = hdfsPath.getFileSystem(serializableConfiguration.value)
+      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+      Try(listLeafFiles(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty)
+    }.map { status =>
+      FakeFileStatus(
+        status.getPath.toString,
+        status.getLen,
+        status.isDir,
+        status.getReplication,
+        status.getBlockSize,
+        status.getModificationTime,
+        status.getAccessTime)
+    }.collect()
+
+    fakeStatuses.map { f =>
+      new FileStatus(
+        f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
+    }.toSet
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a1064df0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index d0ebb11..37b0a9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -447,7 +447,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
{
         (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
         makePartitionDir(base, defaultPartitionName, "pi" -> 2))
 
-      sqlContext.read.format("parquet").load(base.getCanonicalPath).registerTempTable("t")
+      sqlContext
+        .read
+        .option("mergeSchema", "true")
+        .format("parquet")
+        .load(base.getCanonicalPath)
+        .registerTempTable("t")
 
       withTempTable("t") {
         checkAnswer(
@@ -583,4 +588,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
{
           Seq("a", "a, b"),
           Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
   }
+
+  test("Parallel partition discovery") {
+    withTempPath { dir =>
+      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") {
+        val path = dir.getCanonicalPath
+        val df = sqlContext.range(5).select('id as 'a, 'id as 'b, 'id as 'c).coalesce(1)
+        df.write.partitionBy("b", "c").parquet(path)
+        checkAnswer(sqlContext.read.parquet(path), df)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message