Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E2F46200CC4 for ; Thu, 29 Jun 2017 05:32:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E1EFD160BFA; Thu, 29 Jun 2017 03:32:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 39C5B160BF7 for ; Thu, 29 Jun 2017 05:32:36 +0200 (CEST) Received: (qmail 29375 invoked by uid 500); 29 Jun 2017 03:32:34 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 29366 invoked by uid 99); 29 Jun 2017 03:32:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jun 2017 03:32:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ECB7BDFA83; Thu, 29 Jun 2017 03:32:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: <1455d5f2a1574030a820d727adc777b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-21237][SQL] Invalidate stats once table data is changed Date: Thu, 29 Jun 2017 03:32:33 +0000 (UTC) archived-at: Thu, 29 Jun 2017 03:32:38 -0000 Repository: spark Updated Branches: refs/heads/master 25c2edf6f -> 82e24912d [SPARK-21237][SQL] Invalidate stats once table data is changed ## What changes were proposed in this pull request? Invalidate spark's stats after data changing commands: - InsertIntoHadoopFsRelationCommand - InsertIntoHiveTable - LoadDataCommand - TruncateTableCommand - AlterTableSetLocationCommand - AlterTableDropPartitionCommand ## How was this patch tested? Added test cases. Author: wangzhenhua Closes #18449 from wzhfy/removeStats. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82e24912 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82e24912 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82e24912 Branch: refs/heads/master Commit: 82e24912d6e15a9e4fbadd83da9a08d4f80a592b Parents: 25c2edf Author: wangzhenhua Authored: Thu Jun 29 11:32:29 2017 +0800 Committer: Wenchen Fan Committed: Thu Jun 29 11:32:29 2017 +0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/ExternalCatalog.scala | 3 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 4 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../catalyst/catalog/ExternalCatalogSuite.scala | 2 +- .../catalyst/catalog/SessionCatalogSuite.scala | 2 +- .../command/AnalyzeColumnCommand.scala | 4 +- .../execution/command/AnalyzeTableCommand.scala | 76 +---------- .../sql/execution/command/CommandUtils.scala | 102 +++++++++++++++ .../spark/sql/execution/command/ddl.scala | 9 +- .../spark/sql/execution/command/tables.scala | 7 + .../InsertIntoHadoopFsRelationCommand.scala | 5 + .../spark/sql/StatisticsCollectionSuite.scala | 85 ++++++++++-- .../apache/spark/sql/test/SQLTestUtils.scala | 14 ++ .../spark/sql/hive/HiveExternalCatalog.scala | 24 ++-- .../hive/execution/InsertIntoHiveTable.scala | 4 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 130 +++++++++++++++---- 16 files changed, 340 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 12ba5ae..0254b6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -160,7 +160,8 @@ abstract class ExternalCatalog */ def alterTableSchema(db: String, table: String, schema: StructType): Unit - def alterTableStats(db: String, table: String, stats: CatalogStatistics): Unit + /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ + def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit def getTable(db: String, table: String): CatalogTable http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 9820522..747190f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -315,10 +315,10 @@ class InMemoryCatalog( override def alterTableStats( db: String, table: String, - stats: CatalogStatistics): Unit = synchronized { + stats: Option[CatalogStatistics]): Unit = synchronized { requireTableExists(db, table) val origTable = catalog(db).tables(table).table - catalog(db).tables(table).table = origTable.copy(stats = Some(stats)) + catalog(db).tables(table).table = origTable.copy(stats = stats) } override def getTable(db: String, table: String): CatalogTable = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index cf02da8..7ece77d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -380,7 +380,7 @@ class SessionCatalog( * Alter Spark's statistics of an existing metastore table identified by the provided table * identifier. */ - def alterTableStats(identifier: TableIdentifier, newStats: CatalogStatistics): Unit = { + def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = { val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(identifier.table) val tableIdentifier = TableIdentifier(table, Some(db)) http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 557b097..c22d55f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -260,7 +260,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val oldTableStats = catalog.getTable("db2", "tbl1").stats assert(oldTableStats.isEmpty) val newStats = CatalogStatistics(sizeInBytes = 1) - catalog.alterTableStats("db2", "tbl1", newStats) + catalog.alterTableStats("db2", "tbl1", Some(newStats)) val newTableStats = catalog.getTable("db2", "tbl1").stats assert(newTableStats.get == newStats) } http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index a6dc21b..fc3893e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -454,7 +454,7 @@ abstract class SessionCatalogSuite extends AnalysisTest { val oldTableStats = catalog.getTableMetadata(tableId).stats assert(oldTableStats.isEmpty) val newStats = CatalogStatistics(sizeInBytes = 1) - catalog.alterTableStats(tableId, newStats) + catalog.alterTableStats(tableId, Some(newStats)) val newTableStats = catalog.getTableMetadata(tableId).stats assert(newTableStats.get == newStats) } http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 2f273b6..6588993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -42,7 +42,7 @@ case class AnalyzeColumnCommand( if (tableMeta.tableType == CatalogTableType.VIEW) { throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta) + val sizeInBytes = CommandUtils.calculateTotalSize(sessionState, tableMeta) // Compute stats for each column val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames) @@ -54,7 +54,7 @@ case class AnalyzeColumnCommand( // Newly computed column stats should override the existing ones. colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) - sessionState.catalog.alterTableStats(tableIdentWithDB, statistics) + sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) // Refresh the cached data source table in the catalog. sessionState.catalog.refreshTable(tableIdentWithDB) http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 13b8faf..d780ef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -17,18 +17,10 @@ package org.apache.spark.sql.execution.command -import java.net.URI - -import scala.util.control.NonFatal - -import org.apache.hadoop.fs.{FileSystem, Path} - -import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.internal.SessionState /** @@ -46,7 +38,7 @@ case class AnalyzeTableCommand( if (tableMeta.tableType == CatalogTableType.VIEW) { throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta) + val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L) val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) @@ -74,7 +66,7 @@ case class AnalyzeTableCommand( // Update the metastore if the above statistics of the table are different from those // recorded in the metastore. if (newStats.isDefined) { - sessionState.catalog.alterTableStats(tableIdentWithDB, newStats.get) + sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) // Refresh the cached data source table in the catalog. sessionState.catalog.refreshTable(tableIdentWithDB) } @@ -82,65 +74,3 @@ case class AnalyzeTableCommand( Seq.empty[Row] } } - -object AnalyzeTableCommand extends Logging { - - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): Long = { - if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) - } else { - // Calculate table size as a sum of the visible partitions. See SPARK-21079 - val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map(p => - calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - ).sum - } - } - - private def calculateLocationSize( - sessionState: SessionState, - tableId: TableIdentifier, - locationUri: Option[URI]): Long = { - // This method is mainly based on - // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) - // in Hive 0.13 (except that we do not use fs.getContentSummary). - // TODO: Generalize statistics collection. - // TODO: Why fs.getContentSummary returns wrong size on Jenkins? - // Can we use fs.getContentSummary in future? - // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use - // countFileSize to count the table size. - val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") - - def calculateLocationSize(fs: FileSystem, path: Path): Long = { - val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDirectory) { - fs.listStatus(path) - .map { status => - if (!status.getPath.getName.startsWith(stagingDir)) { - calculateLocationSize(fs, status.getPath) - } else { - 0L - } - }.sum - } else { - fileStatus.getLen - } - - size - } - - locationUri.map { p => - val path = new Path(p) - try { - val fs = path.getFileSystem(sessionState.newHadoopConf()) - calculateLocationSize(fs, path) - } catch { - case NonFatal(e) => - logWarning( - s"Failed to get the size of table ${tableId.table} in the " + - s"database ${tableId.database} because of ${e.toString}", e) - 0L - } - }.getOrElse(0L) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala new file mode 100644 index 0000000..9239760 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -0,0 +1,102 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.command + +import java.net.URI + +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} +import org.apache.spark.sql.internal.SessionState + + +object CommandUtils extends Logging { + + /** Change statistics after changing data by commands. */ + def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { + if (table.stats.nonEmpty) { + val catalog = sparkSession.sessionState.catalog + catalog.alterTableStats(table.identifier, None) + } + } + + def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { + if (catalogTable.partitionColumnNames.isEmpty) { + calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + } else { + // Calculate table size as a sum of the visible partitions. See SPARK-21079 + val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) + partitions.map { p => + calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) + }.sum + } + } + + def calculateLocationSize( + sessionState: SessionState, + identifier: TableIdentifier, + locationUri: Option[URI]): Long = { + // This method is mainly based on + // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) + // in Hive 0.13 (except that we do not use fs.getContentSummary). + // TODO: Generalize statistics collection. + // TODO: Why fs.getContentSummary returns wrong size on Jenkins? + // Can we use fs.getContentSummary in future? + // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use + // countFileSize to count the table size. + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + + def getPathSize(fs: FileSystem, path: Path): Long = { + val fileStatus = fs.getFileStatus(path) + val size = if (fileStatus.isDirectory) { + fs.listStatus(path) + .map { status => + if (!status.getPath.getName.startsWith(stagingDir)) { + getPathSize(fs, status.getPath) + } else { + 0L + } + }.sum + } else { + fileStatus.getLen + } + + size + } + + locationUri.map { p => + val path = new Path(p) + try { + val fs = path.getFileSystem(sessionState.newHadoopConf()) + getPathSize(fs, path) + } catch { + case NonFatal(e) => + logWarning( + s"Failed to get the size of table ${identifier.table} in the " + + s"database ${identifier.database} because of ${e.toString}", e) + 0L + } + }.getOrElse(0L) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 413f5f3..ac897c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -433,9 +433,11 @@ case class AlterTableAddPartitionCommand( sparkSession.sessionState.conf.resolver) // inherit table storage format (possibly except for location) CatalogTablePartition(normalizedSpec, table.storage.copy( - locationUri = location.map(CatalogUtils.stringToURI(_)))) + locationUri = location.map(CatalogUtils.stringToURI))) } catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) + + CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } @@ -519,6 +521,9 @@ case class AlterTableDropPartitionCommand( catalog.dropPartitions( table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) + + CommandUtils.updateTableStats(sparkSession, table) + Seq.empty[Row] } @@ -768,6 +773,8 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself catalog.alterTable(table.withNewStorage(locationUri = Some(locUri))) } + + CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index b937a8a..8ded106 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -400,6 +400,7 @@ case class LoadDataCommand( // Refresh the metadata cache to ensure the data visible to the users catalog.refreshTable(targetTable.identifier) + CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] } } @@ -487,6 +488,12 @@ case class TruncateTableCommand( case NonFatal(e) => log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e) } + + if (table.stats.nonEmpty) { + // empty table after truncation + val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0)) + catalog.alterTableStats(tableName, Some(newStats)) + } Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 00aa124..ab26f2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -161,6 +161,11 @@ case class InsertIntoHadoopFsRelationCommand( fileIndex.foreach(_.refresh()) // refresh data cache if table is cached sparkSession.catalog.refreshByPath(outputPath.toString) + + if (catalogTable.nonEmpty) { + CommandUtils.updateTableStats(sparkSession, catalogTable.get) + } + } else { logInfo("Skipping insertion into a relation that already exists.") } http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 9824062..b031c52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -40,17 +40,6 @@ import org.apache.spark.sql.types._ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with SharedSQLContext { import testImplicits._ - private def checkTableStats(tableName: String, expectedRowCount: Option[Int]) - : Option[CatalogStatistics] = { - val df = spark.table(tableName) - val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation => - assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount) - rel.catalogTable.get.stats - } - assert(stats.size == 1) - stats.head - } - test("estimates the size of a limit 0 on outer join") { withTempView("test") { Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") @@ -96,11 +85,11 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared // noscan won't count the number of rows sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") - checkTableStats(tableName, expectedRowCount = None) + checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None) // without noscan, we count the number of rows sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") - checkTableStats(tableName, expectedRowCount = Some(2)) + checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = Some(2)) } } @@ -168,6 +157,60 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared assert(stats.simpleString == expectedString) } } + + test("change stats after truncate command") { + val table = "change_stats_truncate_table" + withTable(table) { + spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table) + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(100)) + assert(fetched1.get.sizeInBytes > 0) + assert(fetched1.get.colStats.size == 2) + + // truncate table command + sql(s"TRUNCATE TABLE $table") + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched2.get.sizeInBytes == 0) + assert(fetched2.get.colStats.isEmpty) + } + } + + test("change stats after set location command") { + val table = "change_stats_set_location_table" + withTable(table) { + spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table) + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value") + val fetched1 = checkTableStats( + table, hasSizeInBytes = true, expectedRowCounts = Some(100)) + assert(fetched1.get.sizeInBytes > 0) + assert(fetched1.get.colStats.size == 2) + + // set location command + withTempDir { newLocation => + sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'") + checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + } + } + } + + test("change stats after insert command for datasource table") { + val table = "change_stats_insert_datasource_table" + withTable(table) { + sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched1.get.sizeInBytes == 0) + assert(fetched1.get.colStats.size == 2) + + // insert into command + sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") + checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + } + } + } @@ -219,6 +262,22 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils private val randomName = new Random(31) + def checkTableStats( + tableName: String, + hasSizeInBytes: Boolean, + expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { + val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats + if (hasSizeInBytes || expectedRowCounts.nonEmpty) { + assert(stats.isDefined) + assert(stats.get.sizeInBytes >= 0) + assert(stats.get.rowCount === expectedRowCounts) + } else { + assert(stats.isEmpty) + } + + stats + } + /** * Compute column stats for the given DataFrame and compare it with colStats. */ http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index f6d4773..d74a7cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -149,6 +149,7 @@ private[sql] trait SQLTestUtils .getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + /** * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` * returns. @@ -165,6 +166,19 @@ private[sql] trait SQLTestUtils } /** + * Creates the specified number of temporary directories, which is then passed to `f` and will be + * deleted after `f` returns. + */ + protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { + val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) + try f(files) finally { + // wait for all tasks to finish before deleting files + waitForTasksToFinish() + files.foreach(Utils.deleteRecursively) + } + } + + /** * Drops functions after calling `f`. A function is represented by (functionName, isTemporary). */ protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 6e7c475..2a17849 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -631,21 +631,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterTableStats( db: String, table: String, - stats: CatalogStatistics): Unit = withClient { + stats: Option[CatalogStatistics]): Unit = withClient { requireTableExists(db, table) val rawTable = getRawTable(db, table) // convert table statistics to properties so that we can persist them through hive client - var statsProperties: Map[String, String] = - Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) - if (stats.rowCount.isDefined) { - statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() - } - val colNameTypeMap: Map[String, DataType] = - rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap - stats.colStats.foreach { case (colName, colStat) => - colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => - statsProperties += (columnStatKeyPropName(colName, k) -> v) + val statsProperties = new mutable.HashMap[String, String]() + if (stats.isDefined) { + statsProperties += STATISTICS_TOTAL_SIZE -> stats.get.sizeInBytes.toString() + if (stats.get.rowCount.isDefined) { + statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString() + } + val colNameTypeMap: Map[String, DataType] = + rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap + stats.get.colStats.foreach { case (colName, colStat) => + colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => + statsProperties += (columnStatKeyPropName(colName, k) -> v) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 392b7cf..223d375 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.command.{CommandUtils, RunnableCommand} import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} @@ -434,6 +434,8 @@ case class InsertIntoHiveTable( sparkSession.catalog.uncacheTable(table.identifier.quotedString) sparkSession.sessionState.catalog.refreshTable(table.identifier) + CommandUtils.updateTableStats(sparkSession, table) + // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 64deb38..5fd266c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ + class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { test("Hive serde tables should fallback to HDFS for size estimation") { @@ -219,23 +221,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } - private def checkTableStats( - tableName: String, - hasSizeInBytes: Boolean, - expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { - val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats - - if (hasSizeInBytes || expectedRowCounts.nonEmpty) { - assert(stats.isDefined) - assert(stats.get.sizeInBytes > 0) - assert(stats.get.rowCount === expectedRowCounts) - } else { - assert(stats.isEmpty) - } - - stats - } - test("test table-level statistics for hive tables created in HiveExternalCatalog") { val textTable = "textTable" withTable(textTable) { @@ -326,7 +311,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto descOutput: Seq[String], propKey: String): Option[BigInt] = { val str = descOutput - .filterNot(_.contains(HiveExternalCatalog.STATISTICS_PREFIX)) + .filterNot(_.contains(STATISTICS_PREFIX)) .filter(_.contains(propKey)) if (str.isEmpty) { None @@ -448,6 +433,103 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto "ALTER TABLE unset_prop_table UNSET TBLPROPERTIES ('prop1')") } + /** + * To see if stats exist, we need to check spark's stats properties instead of catalog + * statistics, because hive would change stats in metastore and thus change catalog statistics. + */ + private def getStatsProperties(tableName: String): Map[String, String] = { + val hTable = hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName) + hTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) + } + + test("change stats after insert command for hive table") { + val table = s"change_stats_insert_hive_table" + withTable(table) { + sql(s"CREATE TABLE $table (i int, j string)") + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched1.get.sizeInBytes == 0) + assert(fetched1.get.colStats.size == 2) + + // insert into command + sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") + assert(getStatsProperties(table).isEmpty) + } + } + + test("change stats after load data command") { + val table = "change_stats_load_table" + withTable(table) { + sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET") + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched1.get.sizeInBytes == 0) + assert(fetched1.get.colStats.size == 2) + + withTempDir { loadPath => + // load data command + val file = new File(loadPath + "/data") + val writer = new PrintWriter(file) + writer.write("2,xyz") + writer.close() + sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table") + assert(getStatsProperties(table).isEmpty) + } + } + } + + test("change stats after add/drop partition command") { + val table = "change_stats_part_table" + withTable(table) { + sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)") + // table has two partitions initially + for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) { + sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'") + } + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)) + assert(fetched1.get.sizeInBytes > 0) + assert(fetched1.get.colStats.size == 2) + + withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => + val file1 = new File(dir1 + "/data") + val writer1 = new PrintWriter(file1) + writer1.write("1,a") + writer1.close() + + val file2 = new File(dir2 + "/data") + val writer2 = new PrintWriter(file2) + writer2.write("1,a") + writer2.close() + + // add partition command + sql( + s""" + |ALTER TABLE $table ADD + |PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}' + |PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}' + """.stripMargin) + assert(getStatsProperties(table).isEmpty) + + // generate stats again + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(4)) + assert(fetched2.get.sizeInBytes > 0) + assert(fetched2.get.colStats.size == 2) + + // drop partition command + sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')") + // only one partition left + assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table)) + .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) + assert(getStatsProperties(table).isEmpty) + } + } + } + test("add/drop partitions - managed table") { val catalog = spark.sessionState.catalog val managedTable = "partitionedTable" @@ -483,23 +565,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(catalog.listPartitions(TableIdentifier(managedTable)).map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) - val stats2 = checkTableStats( - managedTable, hasSizeInBytes = true, expectedRowCounts = Some(4)) - assert(stats1 == stats2) - sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS") - val stats3 = checkTableStats( + val stats2 = checkTableStats( managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1)) - assert(stats2.get.sizeInBytes > stats3.get.sizeInBytes) + assert(stats1.get.sizeInBytes > stats2.get.sizeInBytes) sql(s"ALTER TABLE $managedTable ADD PARTITION (ds='2008-04-08', hr='12')") sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS") val stats4 = checkTableStats( managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1)) - assert(stats2.get.sizeInBytes > stats4.get.sizeInBytes) - assert(stats4.get.sizeInBytes == stats3.get.sizeInBytes) + assert(stats1.get.sizeInBytes > stats4.get.sizeInBytes) + assert(stats4.get.sizeInBytes == stats2.get.sizeInBytes) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org