Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 22EB3200B21 for ; Fri, 27 May 2016 04:01:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 21D65160A2B; Fri, 27 May 2016 02:01:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 40921160A18 for ; Fri, 27 May 2016 04:01:49 +0200 (CEST) Received: (qmail 66101 invoked by uid 500); 27 May 2016 02:01:48 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 66092 invoked by uid 99); 27 May 2016 02:01:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 May 2016 02:01:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 567D0DFDD0; Fri, 27 May 2016 02:01:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Message-Id: <2274d547274a43ad96155ff9cf4e8c81@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2 Date: Fri, 27 May 2016 02:01:48 +0000 (UTC) archived-at: Fri, 27 May 2016 02:01:50 -0000 Repository: spark Updated Branches: refs/heads/master 3ac2363d7 -> 008a5377d [SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2 ## What changes were proposed in this pull request? Two more changes: (1) Fix truncate table for data source tables (only for cases without `PARTITION`) (2) Disallow truncating external tables or views ## How was this patch tested? `DDLSuite` Author: Andrew Or Closes #13315 from andrewor14/truncate-table. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/008a5377 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/008a5377 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/008a5377 Branch: refs/heads/master Commit: 008a5377d57ce6692eca4a41539fb27978b58e01 Parents: 3ac2363 Author: Andrew Or Authored: Thu May 26 19:01:41 2016 -0700 Committer: Andrew Or Committed: Thu May 26 19:01:41 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/command/tables.scala | 78 +++++++++++++------- .../spark/sql/execution/command/DDLSuite.scala | 34 +++++++++ 2 files changed, 86 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/008a5377/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index bef4c92..e34beec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -285,41 +285,67 @@ case class TruncateTableCommand( tableName: TableIdentifier, partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand { - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog + override def run(spark: SparkSession): Seq[Row] = { + val catalog = spark.sessionState.catalog if (!catalog.tableExists(tableName)) { throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.") - } else if (catalog.isTemporaryTable(tableName)) { + } + if (catalog.isTemporaryTable(tableName)) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'") - } else { - val locations = if (partitionSpec.isDefined) { - catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri) + } + val table = catalog.getTableMetadata(tableName) + if (table.tableType == CatalogTableType.EXTERNAL) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'") + } + if (table.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'") + } + val isDatasourceTable = DDLUtils.isDatasourceTable(table) + if (isDatasourceTable && partitionSpec.isDefined) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + + s"for tables created using the data sources API: '$tableName'") + } + if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + + s"for tables that are not partitioned: '$tableName'") + } + val locations = + if (isDatasourceTable || table.partitionColumnNames.isEmpty) { + Seq(table.storage.locationUri) } else { - val table = catalog.getTableMetadata(tableName) - if (table.partitionColumnNames.nonEmpty) { - catalog.listPartitions(tableName).map(_.storage.locationUri) - } else { - Seq(table.storage.locationUri) - } + catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri) } - val hadoopConf = sparkSession.sessionState.newHadoopConf() - locations.foreach { location => - if (location.isDefined) { - val path = new Path(location.get) - try { - val fs = path.getFileSystem(hadoopConf) - fs.delete(path, true) - fs.mkdirs(path) - } catch { - case NonFatal(e) => - throw new AnalysisException( - s"Failed to truncate table '$tableName' when removing data of the path: $path " + - s"because of ${e.toString}") - } + val hadoopConf = spark.sessionState.newHadoopConf() + locations.foreach { location => + if (location.isDefined) { + val path = new Path(location.get) + try { + val fs = path.getFileSystem(hadoopConf) + fs.delete(path, true) + fs.mkdirs(path) + } catch { + case NonFatal(e) => + throw new AnalysisException( + s"Failed to truncate table '$tableName' when removing data of the path: $path " + + s"because of ${e.toString}") } } } + // After deleting the data, invalidate the table to make sure we don't keep around a stale + // file relation in the metastore cache. + spark.sessionState.invalidateTable(tableName.unquotedString) + // Also try to drop the contents of the table from the columnar cache + try { + spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString)) + } catch { + case NonFatal(e) => + log.warn(s"Exception when attempting to uncache table '$tableName'", e) + } Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/008a5377/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index bddd3f2..6c038c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.util.Utils class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private val escapedIdentifier = "`(.+)`".r @@ -1109,4 +1110,37 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("truncate table - datasource table") { + import testImplicits._ + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + data.write.saveAsTable("rectangles") + spark.catalog.cacheTable("rectangles") + assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with") + assume(spark.catalog.isCached("rectangles"), "bad test; table was not cached to begin with") + sql("TRUNCATE TABLE rectangles") + assert(spark.table("rectangles").collect().isEmpty) + assert(!spark.catalog.isCached("rectangles")) + // truncating partitioned data source tables is not supported + data.write.partitionBy("length").saveAsTable("rectangles2") + assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") + assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)") + } + + test("truncate table - external table, temporary table, view (not allowed)") { + import testImplicits._ + val path = Utils.createTempDir().getAbsolutePath + (1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab") + sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'") + sql(s"CREATE VIEW my_view AS SELECT 1") + assertUnsupported("TRUNCATE TABLE my_temp_tab") + assertUnsupported("TRUNCATE TABLE my_ext_tab") + assertUnsupported("TRUNCATE TABLE my_view") + } + + test("truncate table - non-partitioned table (not allowed)") { + sql("CREATE TABLE my_tab (age INT, name STRING)") + assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)") + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org