spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2
Date Fri, 27 May 2016 02:01:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3ac2363d7 -> 008a5377d


[SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2

## What changes were proposed in this pull request?

Two more changes:
(1) Fix truncate table for data source tables (only for cases without `PARTITION`)
(2) Disallow truncating external tables or views

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #13315 from andrewor14/truncate-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/008a5377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/008a5377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/008a5377

Branch: refs/heads/master
Commit: 008a5377d57ce6692eca4a41539fb27978b58e01
Parents: 3ac2363
Author: Andrew Or <andrew@databricks.com>
Authored: Thu May 26 19:01:41 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu May 26 19:01:41 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/command/tables.scala    | 78 +++++++++++++-------
 .../spark/sql/execution/command/DDLSuite.scala  | 34 +++++++++
 2 files changed, 86 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/008a5377/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bef4c92..e34beec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -285,41 +285,67 @@ case class TruncateTableCommand(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalog = sparkSession.sessionState.catalog
+  override def run(spark: SparkSession): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
     if (!catalog.tableExists(tableName)) {
       throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.")
-    } else if (catalog.isTemporaryTable(tableName)) {
+    }
+    if (catalog.isTemporaryTable(tableName)) {
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'")
-    } else {
-      val locations = if (partitionSpec.isDefined) {
-        catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
+    }
+    val table = catalog.getTableMetadata(tableName)
+    if (table.tableType == CatalogTableType.EXTERNAL) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'")
+    }
+    if (table.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
+    }
+    val isDatasourceTable = DDLUtils.isDatasourceTable(table)
+    if (isDatasourceTable && partitionSpec.isDefined) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
+        s"for tables created using the data sources API: '$tableName'")
+    }
+    if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
+        s"for tables that are not partitioned: '$tableName'")
+    }
+    val locations =
+      if (isDatasourceTable || table.partitionColumnNames.isEmpty) {
+        Seq(table.storage.locationUri)
       } else {
-        val table = catalog.getTableMetadata(tableName)
-        if (table.partitionColumnNames.nonEmpty) {
-          catalog.listPartitions(tableName).map(_.storage.locationUri)
-        } else {
-          Seq(table.storage.locationUri)
-        }
+        catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
       }
-      val hadoopConf = sparkSession.sessionState.newHadoopConf()
-      locations.foreach { location =>
-        if (location.isDefined) {
-          val path = new Path(location.get)
-          try {
-            val fs = path.getFileSystem(hadoopConf)
-            fs.delete(path, true)
-            fs.mkdirs(path)
-          } catch {
-            case NonFatal(e) =>
-              throw new AnalysisException(
-                s"Failed to truncate table '$tableName' when removing data of the path: $path
" +
-                  s"because of ${e.toString}")
-          }
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    locations.foreach { location =>
+      if (location.isDefined) {
+        val path = new Path(location.get)
+        try {
+          val fs = path.getFileSystem(hadoopConf)
+          fs.delete(path, true)
+          fs.mkdirs(path)
+        } catch {
+          case NonFatal(e) =>
+            throw new AnalysisException(
+              s"Failed to truncate table '$tableName' when removing data of the path: $path
" +
+                s"because of ${e.toString}")
         }
       }
     }
+    // After deleting the data, invalidate the table to make sure we don't keep around a
stale
+    // file relation in the metastore cache.
+    spark.sessionState.invalidateTable(tableName.unquotedString)
+    // Also try to drop the contents of the table from the columnar cache
+    try {
+      spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
+    } catch {
+      case NonFatal(e) =>
+        log.warn(s"Exception when attempting to uncache table '$tableName'", e)
+    }
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/008a5377/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index bddd3f2..6c038c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
 
 class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   private val escapedIdentifier = "`(.+)`".r
@@ -1109,4 +1110,37 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
       }
     }
   }
+
+  test("truncate table - datasource table") {
+    import testImplicits._
+    val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
+    data.write.saveAsTable("rectangles")
+    spark.catalog.cacheTable("rectangles")
+    assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin
with")
+    assume(spark.catalog.isCached("rectangles"), "bad test; table was not cached to begin
with")
+    sql("TRUNCATE TABLE rectangles")
+    assert(spark.table("rectangles").collect().isEmpty)
+    assert(!spark.catalog.isCached("rectangles"))
+    // truncating partitioned data source tables is not supported
+    data.write.partitionBy("length").saveAsTable("rectangles2")
+    assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
+    assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+  }
+
+  test("truncate table - external table, temporary table, view (not allowed)") {
+    import testImplicits._
+    val path = Utils.createTempDir().getAbsolutePath
+    (1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
+    sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
+    sql(s"CREATE VIEW my_view AS SELECT 1")
+    assertUnsupported("TRUNCATE TABLE my_temp_tab")
+    assertUnsupported("TRUNCATE TABLE my_ext_tab")
+    assertUnsupported("TRUNCATE TABLE my_view")
+  }
+
+  test("truncate table - non-partitioned table (not allowed)") {
+    sql("CREATE TABLE my_tab (age INT, name STRING)")
+    assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message