spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-17990][SPARK-18302][SQL] correct several partition related behaviours of ExternalCatalog
Date Thu, 10 Nov 2016 21:42:52 GMT
Repository: spark
Updated Branches:
  refs/heads/master b533fa2b2 -> 2f7461f31


[SPARK-17990][SPARK-18302][SQL] correct several partition related behaviours of ExternalCatalog

## What changes were proposed in this pull request?

This PR corrects several partition related behaviors of `ExternalCatalog`:

1. default partition location should not always lower case the partition column names in path string(fix `HiveExternalCatalog`)
2. rename partition should not always lower case the partition column names in updated partition path string(fix `HiveExternalCatalog`)
3. rename partition should update the partition location only for managed table(fix `InMemoryCatalog`)
4. create partition with existing directory should be fine(fix `InMemoryCatalog`)
5. create partition with non-existing directory should create that directory(fix `InMemoryCatalog`)
6. drop partition from external table should not delete the directory(fix `InMemoryCatalog`)

## How was this patch tested?

new tests in `ExternalCatalogSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15797 from cloud-fan/partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f7461f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f7461f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f7461f3

Branch: refs/heads/master
Commit: 2f7461f31331cfc37f6cfa3586b7bbefb3af5547
Parents: b533fa2
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu Nov 10 13:42:48 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Nov 10 13:42:48 2016 -0800

----------------------------------------------------------------------
 .../catalyst/catalog/ExternalCatalogUtils.scala | 121 +++++++++++++++
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  92 ++++++------
 .../spark/sql/catalyst/catalog/interface.scala  |  11 ++
 .../catalyst/catalog/ExternalCatalogSuite.scala | 150 +++++++++++++++----
 .../catalyst/catalog/SessionCatalogSuite.scala  |  24 ++-
 .../spark/sql/execution/command/ddl.scala       |   8 +-
 .../spark/sql/execution/command/tables.scala    |   3 +-
 .../datasources/CatalogFileIndex.scala          |   2 +-
 .../datasources/DataSourceStrategy.scala        |   2 +-
 .../datasources/FileFormatWriter.scala          |   6 +-
 .../PartitioningAwareFileIndex.scala            |   2 -
 .../datasources/PartitioningUtils.scala         |  94 +-----------
 .../spark/sql/execution/command/DDLSuite.scala  |   8 +-
 .../ParquetPartitionDiscoverySuite.scala        |  21 +--
 .../spark/sql/hive/HiveExternalCatalog.scala    |  51 ++++++-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |   4 +-
 .../spark/sql/hive/MultiDatabaseSuite.scala     |   2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   2 +-
 19 files changed, 397 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
new file mode 100644
index 0000000..b1442ee
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+
+object ExternalCatalogUtils {
+  // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
+  // depend on Hive.
+  val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  val charToEscape = {
+    val bitSet = new java.util.BitSet(128)
+
+    /**
+     * ASCII 01-1F are HTTP control characters that need to be escaped.
+     * \u000A and \u000D are \n and \r, respectively.
+     */
+    val clist = Array(
+      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
+      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
+      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
+      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
+      '{', '[', ']', '^')
+
+    clist.foreach(bitSet.set(_))
+
+    if (Shell.WINDOWS) {
+      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+    }
+
+    bitSet
+  }
+
+  def needsEscaping(c: Char): Boolean = {
+    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+  }
+
+  def escapePathName(path: String): String = {
+    val builder = new StringBuilder()
+    path.foreach { c =>
+      if (needsEscaping(c)) {
+        builder.append('%')
+        builder.append(f"${c.asInstanceOf[Int]}%02X")
+      } else {
+        builder.append(c)
+      }
+    }
+
+    builder.toString()
+  }
+
+
+  def unescapePathName(path: String): String = {
+    val sb = new StringBuilder
+    var i = 0
+
+    while (i < path.length) {
+      val c = path.charAt(i)
+      if (c == '%' && i + 2 < path.length) {
+        val code: Int = try {
+          Integer.parseInt(path.substring(i + 1, i + 3), 16)
+        } catch {
+          case _: Exception => -1
+        }
+        if (code >= 0) {
+          sb.append(code.asInstanceOf[Char])
+          i += 3
+        } else {
+          sb.append(c)
+          i += 1
+        }
+      } else {
+        sb.append(c)
+        i += 1
+      }
+    }
+
+    sb.toString()
+  }
+
+  def generatePartitionPath(
+      spec: TablePartitionSpec,
+      partitionColumnNames: Seq[String],
+      tablePath: Path): Path = {
+    val partitionPathStrings = partitionColumnNames.map { col =>
+      val partitionValue = spec(col)
+      val partitionString = if (partitionValue == null) {
+        DEFAULT_PARTITION_NAME
+      } else {
+        escapePathName(partitionValue)
+      }
+      escapePathName(col) + "=" + partitionString
+    }
+    partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
+      new Path(totalPath, nextPartPath)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 20db81e..a3ffeaa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -231,7 +231,7 @@ class InMemoryCatalog(
         assert(tableMeta.storage.locationUri.isDefined,
           "Managed table should always have table location, as we will assign a default location " +
             "to it if it doesn't have one.")
-        val dir = new Path(tableMeta.storage.locationUri.get)
+        val dir = new Path(tableMeta.location)
         try {
           val fs = dir.getFileSystem(hadoopConfig)
           fs.delete(dir, true)
@@ -259,7 +259,7 @@ class InMemoryCatalog(
       assert(oldDesc.table.storage.locationUri.isDefined,
         "Managed table should always have table location, as we will assign a default location " +
           "to it if it doesn't have one.")
-      val oldDir = new Path(oldDesc.table.storage.locationUri.get)
+      val oldDir = new Path(oldDesc.table.location)
       val newDir = new Path(catalog(db).db.locationUri, newName)
       try {
         val fs = oldDir.getFileSystem(hadoopConfig)
@@ -355,25 +355,28 @@ class InMemoryCatalog(
       }
     }
 
-    val tableDir = new Path(catalog(db).db.locationUri, table)
-    val partitionColumnNames = getTable(db, table).partitionColumnNames
+    val tableMeta = getTable(db, table)
+    val partitionColumnNames = tableMeta.partitionColumnNames
+    val tablePath = new Path(tableMeta.location)
     // TODO: we should follow hive to roll back if one partition path failed to create.
     parts.foreach { p =>
-      // If location is set, the partition is using an external partition location and we don't
-      // need to handle its directory.
-      if (p.storage.locationUri.isEmpty) {
-        val partitionPath = partitionColumnNames.flatMap { col =>
-          p.spec.get(col).map(col + "=" + _)
-        }.mkString("/")
-        try {
-          val fs = tableDir.getFileSystem(hadoopConfig)
-          fs.mkdirs(new Path(tableDir, partitionPath))
-        } catch {
-          case e: IOException =>
-            throw new SparkException(s"Unable to create partition path $partitionPath", e)
+      val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
+        ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
+      }
+
+      try {
+        val fs = tablePath.getFileSystem(hadoopConfig)
+        if (!fs.exists(partitionPath)) {
+          fs.mkdirs(partitionPath)
         }
+      } catch {
+        case e: IOException =>
+          throw new SparkException(s"Unable to create partition path $partitionPath", e)
       }
-      existingParts.put(p.spec, p)
+
+      existingParts.put(
+        p.spec,
+        p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
     }
   }
 
@@ -392,19 +395,15 @@ class InMemoryCatalog(
       }
     }
 
-    val tableDir = new Path(catalog(db).db.locationUri, table)
-    val partitionColumnNames = getTable(db, table).partitionColumnNames
-    // TODO: we should follow hive to roll back if one partition path failed to delete.
+    val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+    // TODO: we should follow hive to roll back if one partition path failed to delete, and support
+    // partial partition spec.
     partSpecs.foreach { p =>
-      // If location is set, the partition is using an external partition location and we don't
-      // need to handle its directory.
-      if (existingParts.contains(p) && existingParts(p).storage.locationUri.isEmpty) {
-        val partitionPath = partitionColumnNames.flatMap { col =>
-          p.get(col).map(col + "=" + _)
-        }.mkString("/")
+      if (existingParts.contains(p) && shouldRemovePartitionLocation) {
+        val partitionPath = new Path(existingParts(p).location)
         try {
-          val fs = tableDir.getFileSystem(hadoopConfig)
-          fs.delete(new Path(tableDir, partitionPath), true)
+          val fs = partitionPath.getFileSystem(hadoopConfig)
+          fs.delete(partitionPath, true)
         } catch {
           case e: IOException =>
             throw new SparkException(s"Unable to delete partition path $partitionPath", e)
@@ -423,33 +422,34 @@ class InMemoryCatalog(
     requirePartitionsExist(db, table, specs)
     requirePartitionsNotExist(db, table, newSpecs)
 
-    val tableDir = new Path(catalog(db).db.locationUri, table)
-    val partitionColumnNames = getTable(db, table).partitionColumnNames
+    val tableMeta = getTable(db, table)
+    val partitionColumnNames = tableMeta.partitionColumnNames
+    val tablePath = new Path(tableMeta.location)
+    val shouldUpdatePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+    val existingParts = catalog(db).tables(table).partitions
     // TODO: we should follow hive to roll back if one partition path failed to rename.
     specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
-      val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
-      val existingParts = catalog(db).tables(table).partitions
-
-      // If location is set, the partition is using an external partition location and we don't
-      // need to handle its directory.
-      if (newPart.storage.locationUri.isEmpty) {
-        val oldPath = partitionColumnNames.flatMap { col =>
-          oldSpec.get(col).map(col + "=" + _)
-        }.mkString("/")
-        val newPath = partitionColumnNames.flatMap { col =>
-          newSpec.get(col).map(col + "=" + _)
-        }.mkString("/")
+      val oldPartition = getPartition(db, table, oldSpec)
+      val newPartition = if (shouldUpdatePartitionLocation) {
+        val oldPartPath = new Path(oldPartition.location)
+        val newPartPath = ExternalCatalogUtils.generatePartitionPath(
+          newSpec, partitionColumnNames, tablePath)
         try {
-          val fs = tableDir.getFileSystem(hadoopConfig)
-          fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
+          val fs = tablePath.getFileSystem(hadoopConfig)
+          fs.rename(oldPartPath, newPartPath)
         } catch {
           case e: IOException =>
-            throw new SparkException(s"Unable to rename partition path $oldPath", e)
+            throw new SparkException(s"Unable to rename partition path $oldPartPath", e)
         }
+        oldPartition.copy(
+          spec = newSpec,
+          storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
+      } else {
+        oldPartition.copy(spec = newSpec)
       }
 
       existingParts.remove(oldSpec)
-      existingParts.put(newSpec, newPart)
+      existingParts.put(newSpec, newPartition)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 34748a0..93c70de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -99,6 +99,12 @@ case class CatalogTablePartition(
     output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
   }
 
+  /** Return the partition location, assuming it is specified. */
+  def location: String = storage.locationUri.getOrElse {
+    val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
+    throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
+  }
+
   /**
    * Given the partition schema, returns a row with that schema holding the partition values.
    */
@@ -171,6 +177,11 @@ case class CatalogTable(
     throw new AnalysisException(s"table $identifier did not specify database")
   }
 
+  /** Return the table location, assuming it is specified. */
+  def location: String = storage.locationUri.getOrElse {
+    throw new AnalysisException(s"table $identifier did not specify locationUri")
+  }
+
   /** Return the fully qualified name of this table, assuming the database was specified. */
   def qualifiedName: String = identifier.unquotedString
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 34bdfc8..303a866 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
-import java.io.File
-import java.net.URI
-
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
@@ -320,6 +319,33 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
   }
 
+  test("create partitions without location") {
+    val catalog = newBasicCatalog()
+    val table = CatalogTable(
+      identifier = TableIdentifier("tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string"),
+      provider = Some("hive"),
+      partitionColumnNames = Seq("partCol1", "partCol2"))
+    catalog.createTable(table, ignoreIfExists = false)
+
+    val partition = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
+    catalog.createPartitions("db1", "tbl", Seq(partition), ignoreIfExists = false)
+
+    val partitionLocation = catalog.getPartition(
+      "db1",
+      "tbl",
+      Map("partCol1" -> "1", "partCol2" -> "2")).location
+    val tableLocation = catalog.getTable("db1", "tbl").location
+    val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
+    assert(new Path(partitionLocation) == defaultPartitionLocation)
+  }
+
   test("list partitions with partial partition spec") {
     val catalog = newBasicCatalog()
     val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
@@ -399,6 +425,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
   }
 
+  test("rename partitions should update the location for managed table") {
+    val catalog = newBasicCatalog()
+    val table = CatalogTable(
+      identifier = TableIdentifier("tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string"),
+      provider = Some("hive"),
+      partitionColumnNames = Seq("partCol1", "partCol2"))
+    catalog.createTable(table, ignoreIfExists = false)
+
+    val tableLocation = catalog.getTable("db1", "tbl").location
+
+    val mixedCasePart1 = CatalogTablePartition(
+      Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
+    val mixedCasePart2 = CatalogTablePartition(
+      Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat)
+
+    catalog.createPartitions("db1", "tbl", Seq(mixedCasePart1), ignoreIfExists = false)
+    assert(
+      new Path(catalog.getPartition("db1", "tbl", mixedCasePart1.spec).location) ==
+        new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2"))
+
+    catalog.renamePartitions("db1", "tbl", Seq(mixedCasePart1.spec), Seq(mixedCasePart2.spec))
+    assert(
+      new Path(catalog.getPartition("db1", "tbl", mixedCasePart2.spec).location) ==
+        new Path(new Path(tableLocation, "partCol1=3"), "partCol2=4"))
+
+    // For external tables, RENAME PARTITION should not update the partition location.
+    val existingPartLoc = catalog.getPartition("db2", "tbl2", part1.spec).location
+    catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part3.spec))
+    assert(
+      new Path(catalog.getPartition("db2", "tbl2", part3.spec).location) ==
+        new Path(existingPartLoc))
+  }
+
   test("rename partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
@@ -419,11 +485,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
   test("alter partitions") {
     val catalog = newBasicCatalog()
     try {
-      // Note: Before altering table partitions in Hive, you *must* set the current database
-      // to the one that contains the table of interest. Otherwise you will end up with the
-      // most helpful error message ever: "Unable to alter partition. alter is not possible."
-      // See HIVE-2742 for more detail.
-      catalog.setCurrentDatabase("db2")
       val newLocation = newUriForDatabase()
       val newSerde = "com.sparkbricks.text.EasySerde"
       val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false")
@@ -571,10 +632,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
   // --------------------------------------------------------------------------
 
   private def exists(uri: String, children: String*): Boolean = {
-    val base = new File(new URI(uri))
-    children.foldLeft(base) {
-      case (parent, child) => new File(parent, child)
-    }.exists()
+    val base = new Path(uri)
+    val finalPath = children.foldLeft(base) {
+      case (parent, child) => new Path(parent, child)
+    }
+    base.getFileSystem(new Configuration()).exists(finalPath)
   }
 
   test("create/drop database should create/delete the directory") {
@@ -623,7 +685,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
 
   test("create/drop/rename partitions should create/delete/rename the directory") {
     val catalog = newBasicCatalog()
-    val databaseDir = catalog.getDatabase("db1").locationUri
     val table = CatalogTable(
       identifier = TableIdentifier("tbl", Some("db1")),
       tableType = CatalogTableType.MANAGED,
@@ -631,34 +692,61 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
       schema = new StructType()
         .add("col1", "int")
         .add("col2", "string")
-        .add("a", "int")
-        .add("b", "string"),
+        .add("partCol1", "int")
+        .add("partCol2", "string"),
       provider = Some("hive"),
-      partitionColumnNames = Seq("a", "b")
-    )
+      partitionColumnNames = Seq("partCol1", "partCol2"))
     catalog.createTable(table, ignoreIfExists = false)
 
+    val tableLocation = catalog.getTable("db1", "tbl").location
+
+    val part1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
+    val part2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat)
+    val part3 = CatalogTablePartition(Map("partCol1" -> "5", "partCol2" -> "6"), storageFormat)
+
     catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = false)
-    assert(exists(databaseDir, "tbl", "a=1", "b=2"))
-    assert(exists(databaseDir, "tbl", "a=3", "b=4"))
+    assert(exists(tableLocation, "partCol1=1", "partCol2=2"))
+    assert(exists(tableLocation, "partCol1=3", "partCol2=4"))
 
     catalog.renamePartitions("db1", "tbl", Seq(part1.spec), Seq(part3.spec))
-    assert(!exists(databaseDir, "tbl", "a=1", "b=2"))
-    assert(exists(databaseDir, "tbl", "a=5", "b=6"))
+    assert(!exists(tableLocation, "partCol1=1", "partCol2=2"))
+    assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
 
     catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
       purge = false)
-    assert(!exists(databaseDir, "tbl", "a=3", "b=4"))
-    assert(!exists(databaseDir, "tbl", "a=5", "b=6"))
+    assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
+    assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
 
-    val externalPartition = CatalogTablePartition(
-      Map("a" -> "7", "b" -> "8"),
+    val tempPath = Utils.createTempDir()
+    // create partition with existing directory is OK.
+    val partWithExistingDir = CatalogTablePartition(
+      Map("partCol1" -> "7", "partCol2" -> "8"),
       CatalogStorageFormat(
-        Some(Utils.createTempDir().getAbsolutePath),
-        None, None, None, false, Map.empty)
-    )
-    catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false)
-    assert(!exists(databaseDir, "tbl", "a=7", "b=8"))
+        Some(tempPath.getAbsolutePath),
+        None, None, None, false, Map.empty))
+    catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)
+
+    tempPath.delete()
+    // create partition with non-existing directory will create that directory.
+    val partWithNonExistingDir = CatalogTablePartition(
+      Map("partCol1" -> "9", "partCol2" -> "10"),
+      CatalogStorageFormat(
+        Some(tempPath.getAbsolutePath),
+        None, None, None, false, Map.empty))
+    catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
+    assert(tempPath.exists())
+  }
+
+  test("drop partition from external table should not delete the directory") {
+    val catalog = newBasicCatalog()
+    catalog.createPartitions("db2", "tbl1", Seq(part1), ignoreIfExists = false)
+
+    val partPath = new Path(catalog.getPartition("db2", "tbl1", part1.spec).location)
+    val fs = partPath.getFileSystem(new Configuration)
+    assert(fs.exists(partPath))
+
+    catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+    assert(fs.exists(partPath))
   }
 }
 
@@ -731,7 +819,7 @@ abstract class CatalogTestUtils {
     CatalogTable(
       identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL,
-      storage = storageFormat,
+      storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
       schema = new StructType()
         .add("col1", "int")
         .add("col2", "string")

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 001d9c4..52385de 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -527,13 +527,13 @@ class SessionCatalogSuite extends SparkFunSuite {
     sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
     sessionCatalog.createPartitions(
       TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false)
-    assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
+    assert(catalogPartitionsEqual(externalCatalog.listPartitions("mydb", "tbl"), part1, part2))
     // Create partitions without explicitly specifying database
     sessionCatalog.setCurrentDatabase("mydb")
     sessionCatalog.createPartitions(
       TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
     assert(catalogPartitionsEqual(
-      externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder)))
+      externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder))
   }
 
   test("create partitions when database/table does not exist") {
@@ -586,13 +586,13 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("drop partitions") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+    assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
     sessionCatalog.dropPartitions(
       TableIdentifier("tbl2", Some("db2")),
       Seq(part1.spec),
       ignoreIfNotExists = false,
       purge = false)
-    assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
+    assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
     // Drop partitions without explicitly specifying database
     sessionCatalog.setCurrentDatabase("db2")
     sessionCatalog.dropPartitions(
@@ -604,7 +604,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     // Drop multiple partitions at once
     sessionCatalog.createPartitions(
       TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
-    assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+    assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
     sessionCatalog.dropPartitions(
       TableIdentifier("tbl2", Some("db2")),
       Seq(part1.spec, part2.spec),
@@ -844,10 +844,11 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list partitions") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2))
+    assert(catalogPartitionsEqual(
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2))
     // List partitions without explicitly specifying database
     catalog.setCurrentDatabase("db2")
-    assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2))
+    assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2))
   }
 
   test("list partitions when database/table does not exist") {
@@ -860,6 +861,15 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
+  private def catalogPartitionsEqual(
+      actualParts: Seq[CatalogTablePartition],
+      expectedParts: CatalogTablePartition*): Boolean = {
+    // ExternalCatalog may set a default location for partitions, here we ignore the partition
+    // location when comparing them.
+    actualParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet ==
+      expectedParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet
+  }
+
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8500ab4..84a63fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
@@ -500,7 +500,7 @@ case class AlterTableRecoverPartitionsCommand(
         s"location provided: $tableIdentWithDB")
     }
 
-    val root = new Path(table.storage.locationUri.get)
+    val root = new Path(table.location)
     logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
 
@@ -558,9 +558,9 @@ case class AlterTableRecoverPartitionsCommand(
       val name = st.getPath.getName
       if (st.isDirectory && name.contains("=")) {
         val ps = name.split("=", 2)
-        val columnName = PartitioningUtils.unescapePathName(ps(0))
+        val columnName = ExternalCatalogUtils.unescapePathName(ps(0))
         // TODO: Validate the value
-        val value = PartitioningUtils.unescapePathName(ps(1))
+        val value = ExternalCatalogUtils.unescapePathName(ps(1))
         if (resolver(columnName, partitionNames.head)) {
           scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
             partitionNames.drop(1), threshold, resolver)

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e49a1f5..119e732 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -710,7 +710,8 @@ case class ShowPartitionsCommand(
 
   private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
     partColNames.map { name =>
-      PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
+      ExternalCatalogUtils.escapePathName(name) + "=" +
+        ExternalCatalogUtils.escapePathName(spec(name))
     }.mkString(File.separator)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 443a2ec..4ad91dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -67,7 +67,7 @@ class CatalogFileIndex(
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
       val partitions = selectedPartitions.map { p =>
-        val path = new Path(p.storage.locationUri.get)
+        val path = new Path(p.location)
         val fs = path.getFileSystem(hadoopConf)
         PartitionPath(
           p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2d43a6a..739aeac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -190,7 +190,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
       val effectiveOutputPath = if (overwritingSinglePartition) {
         val partition = t.sparkSession.sessionState.catalog.getPartition(
           l.catalogTable.get.identifier, overwrite.specificPartition.get)
-        new Path(partition.storage.locationUri.get)
+        new Path(partition.location)
       } else {
         outputPath
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index fa7fe14..69b3fa6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
@@ -281,11 +281,11 @@ object FileFormatWriter extends Logging {
     private def partitionStringExpression: Seq[Expression] = {
       description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
         val escaped = ScalaUDF(
-          PartitioningUtils.escapePathName _,
+          ExternalCatalogUtils.escapePathName _,
           StringType,
           Seq(Cast(c, StringType)),
           Seq(StringType))
-        val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+        val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
         val partitionName = Literal(c.name + "=") :: str :: Nil
         if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index a8a722d..3740caa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -128,7 +128,6 @@ abstract class PartitioningAwareFileIndex(
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
         val spec = PartitioningUtils.parsePartitions(
           leafDirs,
-          PartitioningUtils.DEFAULT_PARTITION_NAME,
           typeInference = false,
           basePaths = basePaths)
 
@@ -148,7 +147,6 @@ abstract class PartitioningAwareFileIndex(
       case _ =>
         PartitioningUtils.parsePartitions(
           leafDirs,
-          PartitioningUtils.DEFAULT_PARTITION_NAME,
           typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
           basePaths = basePaths)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index b51b418..a28b04c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.util.Shell
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
@@ -56,15 +55,15 @@ object PartitionSpec {
 }
 
 object PartitioningUtils {
-  // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
-  // depend on Hive.
-  val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
 
   private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
   {
     require(columnNames.size == literals.size)
   }
 
+  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+  import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
+
   /**
    * Given a group of qualified paths, tries to parse them and returns a partition specification.
    * For example, given:
@@ -90,12 +89,11 @@ object PartitioningUtils {
    */
   private[datasources] def parsePartitions(
       paths: Seq[Path],
-      defaultPartitionName: String,
       typeInference: Boolean,
       basePaths: Set[Path]): PartitionSpec = {
     // First, we need to parse every partition's path and see if we can find partition values.
     val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
-      parsePartition(path, defaultPartitionName, typeInference, basePaths)
+      parsePartition(path, typeInference, basePaths)
     }.unzip
 
     // We create pairs of (path -> path's partition value) here
@@ -173,7 +171,6 @@ object PartitioningUtils {
    */
   private[datasources] def parsePartition(
       path: Path,
-      defaultPartitionName: String,
       typeInference: Boolean,
       basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
     val columns = ArrayBuffer.empty[(String, Literal)]
@@ -196,7 +193,7 @@ object PartitioningUtils {
         // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
         // Once we get the string, we try to parse it and find the partition column and value.
         val maybeColumn =
-          parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference)
+          parsePartitionColumn(currentPath.getName, typeInference)
         maybeColumn.foreach(columns += _)
 
         // Now, we determine if we should stop.
@@ -228,7 +225,6 @@ object PartitioningUtils {
 
   private def parsePartitionColumn(
       columnSpec: String,
-      defaultPartitionName: String,
       typeInference: Boolean): Option[(String, Literal)] = {
     val equalSignIndex = columnSpec.indexOf('=')
     if (equalSignIndex == -1) {
@@ -240,7 +236,7 @@ object PartitioningUtils {
       val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
       assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
 
-      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
+      val literal = inferPartitionColumnValue(rawColumnValue, typeInference)
       Some(columnName -> literal)
     }
   }
@@ -355,7 +351,6 @@ object PartitioningUtils {
    */
   private[datasources] def inferPartitionColumnValue(
       raw: String,
-      defaultPartitionName: String,
       typeInference: Boolean): Literal = {
     val decimalTry = Try {
       // `BigDecimal` conversion can fail when the `field` is not a form of number.
@@ -380,14 +375,14 @@ object PartitioningUtils {
         .orElse(Try(Literal(JTimestamp.valueOf(unescapePathName(raw)))))
         // Then falls back to string
         .getOrElse {
-          if (raw == defaultPartitionName) {
+          if (raw == DEFAULT_PARTITION_NAME) {
             Literal.create(null, NullType)
           } else {
             Literal.create(unescapePathName(raw), StringType)
           }
         }
     } else {
-      if (raw == defaultPartitionName) {
+      if (raw == DEFAULT_PARTITION_NAME) {
         Literal.create(null, NullType)
       } else {
         Literal.create(unescapePathName(raw), StringType)
@@ -450,77 +445,4 @@ object PartitioningUtils {
       Literal.create(Cast(l, desiredType).eval(), desiredType)
     }
   }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-  // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  val charToEscape = {
-    val bitSet = new java.util.BitSet(128)
-
-    /**
-     * ASCII 01-1F are HTTP control characters that need to be escaped.
-     * \u000A and \u000D are \n and \r, respectively.
-     */
-    val clist = Array(
-      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
-      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
-      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
-      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
-      '{', '[', ']', '^')
-
-    clist.foreach(bitSet.set(_))
-
-    if (Shell.WINDOWS) {
-      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
-    }
-
-    bitSet
-  }
-
-  def needsEscaping(c: Char): Boolean = {
-    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
-  }
-
-  def escapePathName(path: String): String = {
-    val builder = new StringBuilder()
-    path.foreach { c =>
-      if (needsEscaping(c)) {
-        builder.append('%')
-        builder.append(f"${c.asInstanceOf[Int]}%02X")
-      } else {
-        builder.append(c)
-      }
-    }
-
-    builder.toString()
-  }
-
-  def unescapePathName(path: String): String = {
-    val sb = new StringBuilder
-    var i = 0
-
-    while (i < path.length) {
-      val c = path.charAt(i)
-      if (c == '%' && i + 2 < path.length) {
-        val code: Int = try {
-          Integer.parseInt(path.substring(i + 1, i + 3), 16)
-        } catch {
-          case _: Exception => -1
-        }
-        if (code >= 0) {
-          sb.append(code.asInstanceOf[Char])
-          i += 3
-        } else {
-          sb.append(c)
-          i += 1
-        }
-      } else {
-        sb.append(c)
-        i += 1
-      }
-    }
-
-    sb.toString()
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index df3a3c3..363715c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -875,7 +875,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     val part2 = Map("a" -> "2", "b" -> "6")
-    val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+    val root = new Path(catalog.getTableMetadata(tableIdent).location)
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
     // valid
     fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
@@ -1133,7 +1133,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
     assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
     assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
-    assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
     assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
     // Verify that the location is set to the expected string
     def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
@@ -1296,9 +1296,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
       "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
-    assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
     assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
-    assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
 
     // add partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 120a3a2..22e35a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -29,6 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
@@ -48,11 +49,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
   import PartitioningUtils._
   import testImplicits._
 
-  val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
+  val defaultPartitionName = ExternalCatalogUtils.DEFAULT_PARTITION_NAME
 
   test("column type inference") {
     def check(raw: String, literal: Literal): Unit = {
-      assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal)
+      assert(inferPartitionColumnValue(raw, true) === literal)
     }
 
     check("10", Literal.create(10, IntegerType))
@@ -76,7 +77,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       "hdfs://host:9000/path/a=10.5/b=hello")
 
     var exception = intercept[AssertionError] {
-      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path])
+      parsePartitions(paths.map(new Path(_)), true, Set.empty[Path])
     }
     assert(exception.getMessage().contains("Conflicting directory structures detected"))
 
@@ -88,7 +89,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
 
     parsePartitions(
       paths.map(new Path(_)),
-      defaultPartitionName,
       true,
       Set(new Path("hdfs://host:9000/path/")))
 
@@ -101,7 +101,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
 
     parsePartitions(
       paths.map(new Path(_)),
-      defaultPartitionName,
       true,
       Set(new Path("hdfs://host:9000/path/something=true/table")))
 
@@ -114,7 +113,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
 
     parsePartitions(
       paths.map(new Path(_)),
-      defaultPartitionName,
       true,
       Set(new Path("hdfs://host:9000/path/table=true")))
 
@@ -127,7 +125,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     exception = intercept[AssertionError] {
       parsePartitions(
         paths.map(new Path(_)),
-        defaultPartitionName,
         true,
         Set(new Path("hdfs://host:9000/path/")))
     }
@@ -147,7 +144,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     exception = intercept[AssertionError] {
       parsePartitions(
         paths.map(new Path(_)),
-        defaultPartitionName,
         true,
         Set(new Path("hdfs://host:9000/tmp/tables/")))
     }
@@ -156,13 +152,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
 
   test("parse partition") {
     def check(path: String, expected: Option[PartitionValues]): Unit = {
-      val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1
+      val actual = parsePartition(new Path(path), true, Set.empty[Path])._1
       assert(expected === actual)
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])
+        parsePartition(new Path(path), true, Set.empty[Path])
       }.getMessage
 
       assert(message.contains(expected))
@@ -204,7 +200,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     // when the basePaths is the same as the path to a leaf directory
     val partitionSpec1: Option[PartitionValues] = parsePartition(
       path = new Path("file://path/a=10"),
-      defaultPartitionName = defaultPartitionName,
       typeInference = true,
       basePaths = Set(new Path("file://path/a=10")))._1
 
@@ -213,7 +208,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     // when the basePaths is the path to a base directory of leaf directories
     val partitionSpec2: Option[PartitionValues] = parsePartition(
       path = new Path("file://path/a=10"),
-      defaultPartitionName = defaultPartitionName,
       typeInference = true,
       basePaths = Set(new Path("file://path")))._1
 
@@ -231,7 +225,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       val actualSpec =
         parsePartitions(
           paths.map(new Path(_)),
-          defaultPartitionName,
           true,
           rootPaths)
       assert(actualSpec === spec)
@@ -314,7 +307,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
   test("parse partitions with type inference disabled") {
     def check(paths: Seq[String], spec: PartitionSpec): Unit = {
       val actualSpec =
-        parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path])
+        parsePartitions(paths.map(new Path(_)), false, Set.empty[Path])
       assert(actualSpec === spec)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index b537061..42ce1a8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
 import java.util
 
 import scala.util.control.NonFatal
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.ql.metadata.HiveException
 import org.apache.thrift.TException
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -255,7 +256,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           // compatible format, which means the data source is file-based and must have a `path`.
           require(tableDefinition.storage.locationUri.isDefined,
             "External file-based data source table must have a `path` entry in storage properties.")
-          Some(new Path(tableDefinition.storage.locationUri.get).toUri.toString)
+          Some(new Path(tableDefinition.location).toUri.toString)
         } else {
           None
         }
@@ -789,7 +790,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+
+    val tableMeta = getTable(db, table)
+    val partitionColumnNames = tableMeta.partitionColumnNames
+    val tablePath = new Path(tableMeta.location)
+    val partsWithLocation = parts.map { p =>
+      // Ideally we can leave the partition location empty and let Hive metastore to set it.
+      // However, Hive metastore is not case preserving and will generate wrong partition location
+      // with lower cased partition column names. Here we set the default partition location
+      // manually to avoid this problem.
+      val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
+        ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
+      }
+      p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))
+    }
+    val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
     client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
   }
 
@@ -810,6 +825,31 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
     client.renamePartitions(
       db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
+
+    val tableMeta = getTable(db, table)
+    val partitionColumnNames = tableMeta.partitionColumnNames
+    // Hive metastore is not case preserving and keeps partition columns with lower cased names.
+    // When Hive rename partition for managed tables, it will create the partition location with
+    // a default path generate by the new spec with lower cased partition column names. This is
+    // unexpected and we need to rename them manually and alter the partition location.
+    val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
+    if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
+      val tablePath = new Path(tableMeta.location)
+      val newParts = newSpecs.map { spec =>
+        val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+        val wrongPath = new Path(partition.location)
+        val rightPath = ExternalCatalogUtils.generatePartitionPath(
+          spec, partitionColumnNames, tablePath)
+        try {
+          tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
+        } catch {
+          case e: IOException => throw new SparkException(
+            s"Unable to rename partition path from $wrongPath to $rightPath", e)
+        }
+        partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
+      }
+      alterPartitions(db, table, newParts)
+    }
   }
 
   override def alterPartitions(
@@ -817,6 +857,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
     val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+    // Note: Before altering table partitions in Hive, you *must* set the current database
+    // to the one that contains the table of interest. Otherwise you will end up with the
+    // most helpful error message ever: "Unable to alter partition. alter is not possible."
+    // See HIVE-2742 for more detail.
+    client.setCurrentDatabase(db)
     client.alterPartitions(db, table, lowerCasedParts)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index d3873cf..fbd7051 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -445,7 +445,7 @@ object SetWarehouseLocationTest extends Logging {
         catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
       val expectedLocation =
         "file:" + expectedWarehouseLocation.toString + "/testlocation"
-      val actualLocation = tableMetadata.storage.locationUri.get
+      val actualLocation = tableMetadata.location
       if (actualLocation != expectedLocation) {
         throw new Exception(
           s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
@@ -461,7 +461,7 @@ object SetWarehouseLocationTest extends Logging {
         catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
       val expectedLocation =
         "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation"
-      val actualLocation = tableMetadata.storage.locationUri.get
+      val actualLocation = tableMetadata.location
       if (actualLocation != expectedLocation) {
         throw new Exception(
           s"Expected table location is $expectedLocation. But, it is actually $actualLocation")

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index cfc1d81..9f4401a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     val expectedPath =
       spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
 
-    assert(metastoreTable.storage.locationUri.get === expectedPath)
+    assert(metastoreTable.location === expectedPath)
   }
 
   private def getTableNames(dbName: Option[String] = None): Array[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 0076a77..6efae13 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -425,7 +425,7 @@ class HiveDDLSuite
     sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
     val part1 = Map("a" -> "1", "b" -> "5")
     val part2 = Map("a" -> "2", "b" -> "6")
-    val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+    val root = new Path(catalog.getTableMetadata(tableIdent).location)
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
     // valid
     fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7461f3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c21db35..e607af6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -542,7 +542,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         userSpecifiedLocation match {
           case Some(location) =>
-            assert(r.catalogTable.storage.locationUri.get === location)
+            assert(r.catalogTable.location === location)
           case None => // OK.
         }
         // Also make sure that the format and serde are as desired.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message