spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-18044][STREAMING] FileStreamSource should not infer partitions in every batch
Date Mon, 24 Oct 2016 17:52:04 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 aef65ac02 -> bad15bcdf


[SPARK-18044][STREAMING] FileStreamSource should not infer partitions in every batch

## What changes were proposed in this pull request?

In `FileStreamSource.getBatch`, we will create a `DataSource` with specified schema, to avoid
inferring the schema again and again. However, we don't pass the partition columns, and will
infer the partition again and again.

This PR fixes it by keeping the partition columns in `FileStreamSource`, like schema.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15581 from cloud-fan/stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bad15bcd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bad15bcd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bad15bcd

Branch: refs/heads/branch-2.0
Commit: bad15bcdf3d3907e4c2412c7f36344f2c6a19587
Parents: aef65ac
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Fri Oct 21 15:28:16 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Mon Oct 24 10:51:04 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 26 ++++++++++++++------
 .../execution/streaming/FileStreamSource.scala  |  2 ++
 .../streaming/FileStreamSourceSuite.scala       |  2 +-
 3 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bad15bcd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 251f54b..9d2fee8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -74,7 +74,7 @@ case class DataSource(
     bucketSpec: Option[BucketSpec] = None,
     options: Map[String, String] = Map.empty) extends Logging {
 
-  case class SourceInfo(name: String, schema: StructType)
+  case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
 
   lazy val providingClass: Class[_] = lookupDataSource(className)
   lazy val sourceInfo = sourceSchema()
@@ -185,8 +185,11 @@ case class DataSource(
     }
   }
 
-  private def inferFileFormatSchema(format: FileFormat): StructType = {
-    userSpecifiedSchema.orElse {
+  /**
+   * Infer the schema of the given FileFormat, returns a pair of schema and partition column
names.
+   */
+  private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
+    userSpecifiedSchema.map(_ -> partitionColumns).orElse {
       val caseInsensitiveOptions = new CaseInsensitiveMap(options)
       val allPaths = caseInsensitiveOptions.get("path")
       val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -196,14 +199,14 @@ case class DataSource(
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
       val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
-      val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
+      val partitionSchema = fileCatalog.partitionSpec().partitionColumns
       val inferred = format.inferSchema(
         sparkSession,
         caseInsensitiveOptions,
         fileCatalog.allFiles())
 
       inferred.map { inferredSchema =>
-        StructType(inferredSchema ++ partitionCols)
+        StructType(inferredSchema ++ partitionSchema) -> partitionSchema.map(_.name)
       }
     }.getOrElse {
       throw new AnalysisException("Unable to infer schema. It must be specified manually.")
@@ -216,7 +219,7 @@ case class DataSource(
       case s: StreamSourceProvider =>
         val (name, schema) = s.sourceSchema(
           sparkSession.sqlContext, userSpecifiedSchema, className, options)
-        SourceInfo(name, schema)
+        SourceInfo(name, schema, Nil)
 
       case format: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
@@ -245,7 +248,8 @@ case class DataSource(
               "you may be able to create a static DataFrame on that directory with " +
               "'spark.read.load(directory)' and infer schema from it.")
         }
-        SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
+        val (schema, partCols) = inferFileFormatSchema(format)
+        SourceInfo(s"FileSource[$path]", schema, partCols)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -265,7 +269,13 @@ case class DataSource(
           throw new IllegalArgumentException("'path' is not specified")
         })
         new FileStreamSource(
-          sparkSession, path, className, sourceInfo.schema, metadataPath, options)
+          sparkSession = sparkSession,
+          path = path,
+          fileFormatClassName = className,
+          schema = sourceInfo.schema,
+          partitionColumns = sourceInfo.partitionColumns,
+          metadataPath = metadataPath,
+          options = options)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")

http://git-wip-us.apache.org/repos/asf/spark/blob/bad15bcd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index ae3e709..5ada238 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -35,6 +35,7 @@ class FileStreamSource(
     path: String,
     fileFormatClassName: String,
     override val schema: StructType,
+    partitionColumns: Seq[String],
     metadataPath: String,
     options: Map[String, String]) extends Source with Logging {
 
@@ -142,6 +143,7 @@ class FileStreamSource(
         sparkSession,
         paths = files.map(_.path),
         userSpecifiedSchema = Some(schema),
+        partitionColumns = partitionColumns,
         className = fileFormatClassName,
         options = optionsWithPartitionBasePath)
     Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/bad15bcd/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index 1793db0..3bad5bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -94,7 +94,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext
{
         new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
       assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))
 
-      val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
+      val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
Nil,
         dir.getAbsolutePath, Map.empty)
       // this method should throw an exception if `fs.exists` is called during resolveRelation
       newSource.getBatch(None, LongOffset(1))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message