spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping
Date Thu, 12 May 2016 18:14:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master 470de743e -> be617f3d0


[SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition
Spec Existence Before Dropping

#### What changes were proposed in this pull request?
~~Currently, multiple partitions are allowed to drop by using a single DDL command: Alter
Table Drop Partition. However, the internal implementation could break atomicity. That means,
we could just drop a subset of qualified partitions, if hitting an exception when dropping
one of qualified partitions~~

~~This PR contains the following behavior changes:~~
~~- disallow dropping multiple partitions by a single command ~~
~~- allow users to input predicates in partition specification and issue a nicer error message
if the predicate's comparison operator is not `=`.~~
~~- verify the partition spec in SessionCatalog. This can ensure each partition spec in `Drop
Partition` does not correspond to multiple partitions.~~

This PR has two major parts:
- Verify the partition spec in SessionCatalog for fixing the following issue:
  ```scala
  sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
  ```
  Above example uses an invalid partition spec. Without this PR, we will drop all the partitions.
The reason is Hive megastores getPartitions API returns all the partitions if we provide an
invalid spec.

- Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check if all the
user-specified partition specs exist before attempting to drop the partitions. Previously,
we start drop the partition before completing checking the existence of all the partition
specs. If any failure happened after we start to drop the partitions, we will log an error
message to indicate which partitions have been dropped and which partitions have not been
dropped.

#### How was this patch tested?
Modified the existing test cases and added new test cases.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12801 from gatorsmile/banDropMultiPart.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be617f3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be617f3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be617f3d

Branch: refs/heads/master
Commit: be617f3d0695982f982006fdd79afe3e3730b4c4
Parents: 470de74
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Thu May 12 11:14:40 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu May 12 11:14:40 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  47 +++++++-
 .../catalyst/catalog/ExternalCatalogSuite.scala |   6 +
 .../catalyst/catalog/SessionCatalogSuite.scala  | 116 ++++++++++++++++++-
 .../spark/sql/execution/command/DDLSuite.scala  |  78 ++++++-------
 .../spark/sql/hive/client/HiveClientImpl.scala  |  50 +++++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   9 +-
 6 files changed, 248 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0fc4ab5..54b30d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -510,6 +510,7 @@ class SessionCatalog(
       tableName: TableIdentifier,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
+    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
     val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
@@ -523,13 +524,14 @@ class SessionCatalog(
    */
   def dropPartitions(
       tableName: TableIdentifier,
-      parts: Seq[TablePartitionSpec],
+      specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit = {
+    requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
     val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
-    externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
+    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
   }
 
   /**
@@ -542,6 +544,9 @@ class SessionCatalog(
       tableName: TableIdentifier,
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = {
+    val tableMetadata = getTableMetadata(tableName)
+    requireExactMatchedPartitionSpec(specs, tableMetadata)
+    requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
     val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
@@ -559,6 +564,7 @@ class SessionCatalog(
    * this becomes a no-op.
    */
   def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit
= {
+    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
     val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
@@ -571,6 +577,7 @@ class SessionCatalog(
    * If no database is specified, assume the table is in the current database.
    */
   def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition
= {
+    requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
     val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
@@ -595,6 +602,42 @@ class SessionCatalog(
     externalCatalog.listPartitions(db, table, partialSpec)
   }
 
+  /**
+   * Verify if the input partition spec exactly matches the existing defined partition spec
+   * The columns must be the same but the orders could be different.
+   */
+  private def requireExactMatchedPartitionSpec(
+      specs: Seq[TablePartitionSpec],
+      table: CatalogTable): Unit = {
+    val defined = table.partitionColumnNames.sorted
+    specs.foreach { s =>
+      if (s.keys.toSeq.sorted != defined) {
+        throw new AnalysisException(
+          s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
+            s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in
" +
+            s"table '${table.identifier}'")
+      }
+    }
+  }
+
+  /**
+   * Verify if the input partition spec partially matches the existing defined partition
spec
+   * That is, the columns of partition spec should be part of the defined partition spec.
+   */
+  private def requirePartialMatchedPartitionSpec(
+      specs: Seq[TablePartitionSpec],
+      table: CatalogTable): Unit = {
+    val defined = table.partitionColumnNames
+    specs.foreach { s =>
+      if (!s.keys.forall(defined.contains)) {
+        throw new AnalysisException(
+          s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained
" +
+            s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined
" +
+            s"in table '${table.identifier}'")
+      }
+    }
+  }
+
   // ----------------------------------------------------------------------------
   // Functions
   // ----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index ae190c0..377e64b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -627,6 +627,12 @@ abstract class CatalogTestUtils {
   lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
   lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
   lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+  lazy val partWithMixedOrder = CatalogTablePartition(Map("b" -> "6", "a" -> "6"),
storageFormat)
+  lazy val partWithLessColumns = CatalogTablePartition(Map("a" -> "1"), storageFormat)
+  lazy val partWithMoreColumns =
+    CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
+  lazy val partWithUnknownColumns =
+    CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
   lazy val funcClass = "org.apache.spark.myFunc"
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 726b7a1..91e2e07 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -482,8 +482,10 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
     // Create partitions without explicitly specifying database
     sessionCatalog.setCurrentDatabase("mydb")
-    sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists =
false)
-    assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3)))
+    sessionCatalog.createPartitions(
+      TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(
+      externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder)))
   }
 
   test("create partitions when database/table does not exist") {
@@ -508,6 +510,31 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
   }
 
+  test("create partitions with invalid part spec") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    var e = intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(part1, partWithLessColumns), ignoreIfExists = false)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+      "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(part1, partWithMoreColumns), ignoreIfExists = true)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithUnknownColumns, part1), ignoreIfExists = true)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+  }
+
   test("drop partitions") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
@@ -565,6 +592,28 @@ class SessionCatalogSuite extends SparkFunSuite {
       ignoreIfNotExists = true)
   }
 
+  test("drop partitions with invalid partition spec") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    var e = intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithMoreColumns.spec),
+        ignoreIfNotExists = false)
+    }
+    assert(e.getMessage.contains(
+      "Partition spec is invalid. The spec (a, b, c) must be contained within " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithUnknownColumns.spec),
+        ignoreIfNotExists = false)
+    }
+    assert(e.getMessage.contains(
+      "Partition spec is invalid. The spec (a, unknown) must be contained within " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+  }
+
   test("get partition") {
     val catalog = new SessionCatalog(newBasicCatalog())
     assert(catalog.getPartition(
@@ -591,6 +640,25 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
+  test("get partition with invalid partition spec") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    var e = intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+  }
+
   test("rename partitions") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
@@ -633,6 +701,31 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
+  test("rename partition with invalid partition spec") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    var e = intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("db2")),
+        Seq(part1.spec), Seq(partWithLessColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("db2")),
+        Seq(part1.spec), Seq(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("db2")),
+        Seq(part1.spec), Seq(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+  }
+
   test("alter partitions") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val newLocation = newUriForDatabase()
@@ -673,6 +766,25 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
+  test("alter partition with invalid partition spec") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    var e = intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match
" +
+      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+  }
+
   test("list partitions") {
     val catalog = new SessionCatalog(newBasicCatalog())
     assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1,
part2))

http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 64b90b1..82123be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.DatabaseAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException,
NoSuchTableException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
@@ -88,10 +88,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
         CatalogColumn("col1", "int"),
         CatalogColumn("col2", "string"),
         CatalogColumn("a", "int"),
-        CatalogColumn("b", "int"),
-        CatalogColumn("c", "int"),
-        CatalogColumn("d", "int")),
-      partitionColumnNames = Seq("a", "b", "c", "d"),
+        CatalogColumn("b", "int")),
+      partitionColumnNames = Seq("a", "b"),
       createTime = 0L)
   }
 
@@ -563,9 +561,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   test("alter table: rename partition") {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    val part1 = Map("a" -> "1")
-    val part2 = Map("b" -> "2")
-    val part3 = Map("c" -> "3")
+    val part1 = Map("a" -> "1", "b" -> "q")
+    val part2 = Map("a" -> "2", "b" -> "c")
+    val part3 = Map("a" -> "3", "b" -> "p")
     createDatabase(catalog, "dbx")
     createTable(catalog, tableIdent)
     createTablePartition(catalog, part1, tableIdent)
@@ -573,22 +571,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     createTablePartition(catalog, part3, tableIdent)
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
       Set(part1, part2, part3))
-    sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')")
-    sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')")
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='200', b='c')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "100"), Map("b" -> "200"), part3))
+      Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
     // rename without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
-    sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')")
+    sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "10"), Map("b" -> "200"), part3))
+      Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
     // table to alter does not exist
-    intercept[AnalysisException] {
+    intercept[NoSuchTableException] {
       sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
     }
     // partition to rename does not exist
-    intercept[AnalysisException] {
-      sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')")
+    intercept[NoSuchPartitionException] {
+      sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1',
b='2')")
     }
   }
 
@@ -729,7 +727,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   private def testSetLocation(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    val partSpec = Map("a" -> "1")
+    val partSpec = Map("a" -> "1", "b" -> "2")
     createDatabase(catalog, "dbx")
     createTable(catalog, tableIdent)
     createTablePartition(catalog, partSpec, tableIdent)
@@ -762,7 +760,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     verifyLocation("/path/to/your/lovely/heart")
     // set table partition location
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'")
+      sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
     }
     verifyLocation("/path/to/part/ways", Some(partSpec))
     // set table location without explicitly specifying database
@@ -771,7 +769,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     verifyLocation("/swanky/steak/place")
     // set table partition location without explicitly specifying database
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'")
+      sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
     }
     verifyLocation("vienna", Some(partSpec))
     // table to alter does not exist
@@ -833,10 +831,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    val part1 = Map("a" -> "1")
-    val part2 = Map("b" -> "2")
-    val part3 = Map("c" -> "3")
-    val part4 = Map("d" -> "4")
+    val part1 = Map("a" -> "1", "b" -> "5")
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val part3 = Map("a" -> "3", "b" -> "7")
+    val part4 = Map("a" -> "4", "b" -> "8")
     createDatabase(catalog, "dbx")
     createTable(catalog, tableIdent)
     createTablePartition(catalog, part1, tableIdent)
@@ -846,18 +844,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
-        "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')")
+        "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
     }
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
       assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
-      assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris"))
+      assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
       assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
     }
     // add partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
     }
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
@@ -865,14 +863,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     }
     // table to alter does not exist
     intercept[AnalysisException] {
-      sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')")
+      sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")
     }
     // partition to add already exists
     intercept[AnalysisException] {
-      sql("ALTER TABLE tab1 ADD PARTITION (d='4')")
+      sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")
     }
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
     }
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
@@ -883,10 +881,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    val part1 = Map("a" -> "1")
-    val part2 = Map("b" -> "2")
-    val part3 = Map("c" -> "3")
-    val part4 = Map("d" -> "4")
+    val part1 = Map("a" -> "1", "b" -> "5")
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val part3 = Map("a" -> "3", "b" -> "7")
+    val part4 = Map("a" -> "4", "b" -> "8")
     createDatabase(catalog, "dbx")
     createTable(catalog, tableIdent)
     createTablePartition(catalog, part1, tableIdent)
@@ -899,7 +897,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
       convertToDatasourceTable(catalog, tableIdent)
     }
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')")
+      sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3',
b='7')")
     }
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
@@ -907,24 +905,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     // drop partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')")
+      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
     }
     if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
     }
     // table to alter does not exist
     intercept[AnalysisException] {
-      sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')")
+      sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (a='2')")
     }
     // partition to drop does not exist
     intercept[AnalysisException] {
-      sql("ALTER TABLE tab1 DROP PARTITION (x='300')")
+      sql("ALTER TABLE tab1 DROP PARTITION (a='300')")
     }
     maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')")
+      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
     }
     if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bb32459..78c457b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
 import java.io.{File, PrintStream}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.language.reflectiveCalls
 
 import org.apache.hadoop.conf.Configuration
@@ -405,20 +406,43 @@ private[hive] class HiveClientImpl(
       ignoreIfNotExists: Boolean): Unit = withHiveState {
     // TODO: figure out how to drop multiple partitions in one call
     val hiveTable = client.getTable(db, table, true /* throw exception */)
-    specs.foreach { s =>
-      // The provided spec here can be a partial spec, i.e. it will match all partitions
-      // whose specs are supersets of this partial spec. E.g. If a table has partitions
-      // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
-      val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala
-      if (matchingParts.isEmpty && !ignoreIfNotExists) {
-        throw new AnalysisException(
-          s"partition to drop '$s' does not exist in table '$table' database '$db'")
-      }
-      matchingParts.foreach { hivePartition =>
-        val dropOptions = new PartitionDropOptions
-        dropOptions.ifExists = ignoreIfNotExists
-        client.dropPartition(db, table, hivePartition.getValues, dropOptions)
+    // do the check at first and collect all the matching partitions
+    val matchingParts =
+      specs.flatMap { s =>
+        // The provided spec here can be a partial spec, i.e. it will match all partitions
+        // whose specs are supersets of this partial spec. E.g. If a table has partitions
+        // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
+        val parts = client.getPartitions(hiveTable, s.asJava).asScala
+        if (parts.isEmpty && !ignoreIfNotExists) {
+          throw new AnalysisException(
+            s"No partition is dropped. One partition spec '$s' does not exist in table '$table'
" +
+            s"database '$db'")
+        }
+        parts.map(_.getValues)
+      }.distinct
+    var droppedParts = ArrayBuffer.empty[java.util.List[String]]
+    matchingParts.foreach { partition =>
+      val dropOptions = new PartitionDropOptions
+      dropOptions.ifExists = ignoreIfNotExists
+      try {
+        client.dropPartition(db, table, partition, dropOptions)
+      } catch {
+        case e: Exception =>
+          val remainingParts = matchingParts.toBuffer -- droppedParts
+          logError(
+            s"""
+               |======================
+               |Attempt to drop the partition specs in table '$table' database '$db':
+               |${specs.mkString("\n")}
+               |In this attempt, the following partitions have been dropped successfully:
+               |${droppedParts.mkString("\n")}
+               |The remaining partitions have not been dropped:
+               |${remainingParts.mkString("\n")}
+               |======================
+             """.stripMargin)
+          throw e
       }
+      droppedParts += partition
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ae61322..e2cef38 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -184,10 +184,17 @@ class HiveDDLSuite
         // After data insertion, all the directory are not empty
         assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
 
+        val message = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
+        }
+        assert(message.getMessage.contains(
+          "Partition spec is invalid. The spec (ds, unknowncol) must be contained within
the " +
+            "partition spec (ds, hr) defined in table '`default`.`exttable_with_partitions`'"))
+
         sql(
           s"""
              |ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-08'),
-             |PARTITION (ds='2008-04-09', hr='12')
+             |PARTITION (hr='12')
           """.stripMargin)
         assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
           Set(Map("ds" -> "2008-04-09", "hr" -> "11")))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message