spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-17361][SQL] file-based external table without path should not be created
Date Tue, 06 Sep 2016 06:17:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master 64e826f91 -> c0ae6bc6e


[SPARK-17361][SQL] file-based external table without path should not be created

## What changes were proposed in this pull request?

Using the public `Catalog` API, users can create a file-based data source table, without giving
the path options. For this case, currently we can create the table successfully, but fail
when we read it. Ideally we should fail during creation.

This is because when we create data source table, we resolve the data source relation without
validating path: `resolveRelation(checkPathExist = false)`.

Looking back to why we add this trick(`checkPathExist`), it's because when we call `resolveRelation`
for managed table, we add the path to data source options but the path is not created yet.
So why we add this not-yet-created path to data source options? This PR fix the problem by
adding path to options after we call `resolveRelation`. Then we can remove the `checkPathExist`
parameter in `DataSource.resolveRelation` and do some related cleanups.

## How was this patch tested?

existing tests and new test in `CatalogSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14921 from cloud-fan/check-path.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ae6bc6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ae6bc6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ae6bc6

Branch: refs/heads/master
Commit: c0ae6bc6ea38909730fad36e653d3c7ab0a84b44
Parents: 64e826f
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Tue Sep 6 14:17:47 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Sep 6 14:17:47 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  4 ++--
 .../command/createDataSourceTables.scala        | 24 +++++++++++++-------
 .../spark/sql/execution/command/ddl.scala       |  4 ++--
 .../spark/sql/execution/command/tables.scala    |  2 +-
 .../sql/execution/datasources/DataSource.scala  | 12 ++++------
 .../datasources/ListingFileCatalog.scala        | 18 +++------------
 .../datasources/fileSourceInterfaces.scala      |  9 ++------
 .../spark/sql/internal/CatalogSuite.scala       | 17 ++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 9 files changed, 48 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 79231ee..e74fa6e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -156,9 +156,9 @@ case class CatalogTable(
       outputFormat: Option[String] = storage.outputFormat,
       compressed: Boolean = false,
       serde: Option[String] = storage.serde,
-      serdeProperties: Map[String, String] = storage.properties): CatalogTable = {
+      properties: Map[String, String] = storage.properties): CatalogTable = {
     copy(storage = CatalogStorageFormat(
-      locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
+      locationUri, inputFormat, outputFormat, serde, compressed, properties))
   }
 
   override def toString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index c7e3279..b1830e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -56,12 +55,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists:
Boo
       }
     }
 
-    val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
-      table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.properties
-    }
-
     // Create the relation to validate the arguments before writing the metadata to the metastore,
     // and infer the table schema and partition if users didn't specify schema in CREATE
TABLE.
     val dataSource: BaseRelation =
@@ -70,7 +63,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists:
Boo
         userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
         className = table.provider.get,
         bucketSpec = table.bucketSpec,
-        options = optionsWithPath).resolveRelation(checkPathExist = false)
+        options = table.storage.properties).resolveRelation()
+
+    dataSource match {
+      case fs: HadoopFsRelation =>
+        if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty)
{
+          throw new AnalysisException(
+            "Cannot create a file-based external data source table without path")
+        }
+      case _ =>
+    }
 
     val partitionColumnNames = if (table.schema.nonEmpty) {
       table.partitionColumnNames
@@ -83,6 +85,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists:
Boo
       }
     }
 
+    val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
+      table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
+    } else {
+      table.storage.properties
+    }
+
     val newTable = table.copy(
       storage = table.storage.copy(properties = optionsWithPath),
       schema = dataSource.schema,

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 53fb684..bc1c4f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -318,7 +318,7 @@ case class AlterTableSerDePropertiesCommand(
     if (partSpec.isEmpty) {
       val newTable = table.withNewStorage(
         serde = serdeClassName.orElse(table.storage.serde),
-        serdeProperties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
+        properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
       catalog.alterTable(newTable)
     } else {
       val spec = partSpec.get
@@ -669,7 +669,7 @@ case class AlterTableSetLocationCommand(
           if (DDLUtils.isDatasourceTable(table)) {
             table.withNewStorage(
               locationUri = Some(location),
-              serdeProperties = table.storage.properties ++ Map("path" -> location))
+              properties = table.storage.properties ++ Map("path" -> location))
           } else {
             table.withNewStorage(locationUri = Some(location))
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 4e6caae..027f358 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -181,7 +181,7 @@ case class AlterTableRenameCommand(
       if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED)
{
         val newPath = catalog.defaultTablePath(newTblName)
         val newTable = table.withNewStorage(
-          serdeProperties = table.storage.properties ++ Map("path" -> newPath))
+          properties = table.storage.properties ++ Map("path" -> newPath))
         catalog.alterTable(newTable)
       }
       // Invalidate the table last, otherwise uncaching the table would load the logical
plan

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9c99a80..71807b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -315,12 +315,8 @@ case class DataSource(
   /**
    * Create a resolved [[BaseRelation]] that can be used to read data from or write data
into this
    * [[DataSource]]
-   *
-   * @param checkPathExist A flag to indicate whether to check the existence of path or not.
-   *                       This flag will be set to false when we create an empty table (the
-   *                       path of the table does not exist).
    */
-  def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
+  def resolveRelation(): BaseRelation = {
     val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
@@ -367,11 +363,11 @@ case class DataSource(
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
-          if (checkPathExist && globPath.isEmpty) {
+          if (globPath.isEmpty) {
             throw new AnalysisException(s"Path does not exist: $qualified")
           }
           // Sufficient to check head of the globPath seq for non-glob scenario
-          if (checkPathExist && !fs.exists(globPath.head)) {
+          if (!fs.exists(globPath.head)) {
             throw new AnalysisException(s"Path does not exist: ${globPath.head}")
           }
           globPath
@@ -391,7 +387,7 @@ case class DataSource(
 
         val fileCatalog =
           new ListingFileCatalog(
-            sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)
+            sparkSession, globbedPaths, options, partitionSchema)
 
         val dataSchema = userSpecifiedSchema.map { schema =>
           val equality = sparkSession.sessionState.conf.resolver

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 706ec6b..60742bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.FileNotFoundException
-
 import scala.collection.mutable
-import scala.util.Try
 
 import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
@@ -37,16 +34,12 @@ import org.apache.spark.sql.types.StructType
  * @param paths a list of paths to scan
  * @param partitionSchema an optional partition schema that will be use to provide types
for the
  *                        discovered partitions
- * @param ignoreFileNotFound if true, return empty file list when encountering a
- *                           [[FileNotFoundException]] in file listing. Note that this is
a hack
- *                           for SPARK-16313. We should get rid of this flag in the future.
  */
 class ListingFileCatalog(
     sparkSession: SparkSession,
     override val paths: Seq[Path],
     parameters: Map[String, String],
-    partitionSchema: Option[StructType],
-    ignoreFileNotFound: Boolean = false)
+    partitionSchema: Option[StructType])
   extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
 
   @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@@ -88,7 +81,7 @@ class ListingFileCatalog(
    */
   def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
     if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
{
-      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound)
+      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
     } else {
       // Right now, the number of paths is less than the value of
       // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
@@ -104,12 +97,7 @@ class ListingFileCatalog(
         logTrace(s"Listing $path on driver")
 
         val childStatuses = {
-          val stats =
-            try {
-              fs.listStatus(path)
-            } catch {
-              case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
-            }
+          val stats = fs.listStatus(path)
           if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else
stats
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 7e40c35..5cc5f32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -440,8 +440,7 @@ object HadoopFsRelation extends Logging {
   def listLeafFilesInParallel(
       paths: Seq[Path],
       hadoopConf: Configuration,
-      sparkSession: SparkSession,
-      ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = {
+      sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
     assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
     logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
 
@@ -462,11 +461,7 @@ object HadoopFsRelation extends Logging {
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
       paths.map(new Path(_)).flatMap { path =>
         val fs = path.getFileSystem(serializableConfiguration.value)
-        try {
-          listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
-        } catch {
-          case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
-        }
+        listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
       }
     }.map { status =>
       val blockLocations = status match {

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 8aa8185..b221eed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 
 /**
@@ -305,6 +306,22 @@ class CatalogSuite
     columnFields.foreach { f => assert(columnString.contains(f.toString)) }
   }
 
+  test("createExternalTable should fail if path is not given for file-based data source")
{
+    val e = intercept[AnalysisException] {
+      spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
+    }
+    assert(e.message.contains("Unable to infer schema"))
+
+    val e2 = intercept[AnalysisException] {
+      spark.catalog.createExternalTable(
+        "tbl",
+        "json",
+        new StructType().add("i", IntegerType),
+        Map.empty[String, String])
+    }
+    assert(e2.message == "Cannot create a file-based external data source table without path")
+  }
+
   // TODO: add tests for the rest of them
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ae6bc6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c48d4ed..8410a2e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -81,7 +81,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
             options = table.storage.properties)
 
         LogicalRelation(
-          dataSource.resolveRelation(checkPathExist = true),
+          dataSource.resolveRelation(),
           catalogTable = Some(table))
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message