spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-21237][SQL] Invalidate stats once table data is changed
Date Thu, 29 Jun 2017 03:32:33 GMT
Repository: spark
Updated Branches:
  refs/heads/master 25c2edf6f -> 82e24912d


[SPARK-21237][SQL] Invalidate stats once table data is changed

## What changes were proposed in this pull request?

Invalidate spark's stats after data changing commands:

- InsertIntoHadoopFsRelationCommand
- InsertIntoHiveTable
- LoadDataCommand
- TruncateTableCommand
- AlterTableSetLocationCommand
- AlterTableDropPartitionCommand

## How was this patch tested?

Added test cases.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #18449 from wzhfy/removeStats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82e24912
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82e24912
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82e24912

Branch: refs/heads/master
Commit: 82e24912d6e15a9e4fbadd83da9a08d4f80a592b
Parents: 25c2edf
Author: wangzhenhua <wangzhenhua@huawei.com>
Authored: Thu Jun 29 11:32:29 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Thu Jun 29 11:32:29 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  |   3 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |   4 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |   2 +-
 .../catalyst/catalog/ExternalCatalogSuite.scala |   2 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |   2 +-
 .../command/AnalyzeColumnCommand.scala          |   4 +-
 .../execution/command/AnalyzeTableCommand.scala |  76 +----------
 .../sql/execution/command/CommandUtils.scala    | 102 +++++++++++++++
 .../spark/sql/execution/command/ddl.scala       |   9 +-
 .../spark/sql/execution/command/tables.scala    |   7 +
 .../InsertIntoHadoopFsRelationCommand.scala     |   5 +
 .../spark/sql/StatisticsCollectionSuite.scala   |  85 ++++++++++--
 .../apache/spark/sql/test/SQLTestUtils.scala    |  14 ++
 .../spark/sql/hive/HiveExternalCatalog.scala    |  24 ++--
 .../hive/execution/InsertIntoHiveTable.scala    |   4 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 130 +++++++++++++++----
 16 files changed, 340 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 12ba5ae..0254b6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -160,7 +160,8 @@ abstract class ExternalCatalog
    */
   def alterTableSchema(db: String, table: String, schema: StructType): Unit
 
-  def alterTableStats(db: String, table: String, stats: CatalogStatistics): Unit
+  /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics.
*/
+  def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
 
   def getTable(db: String, table: String): CatalogTable
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9820522..747190f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -315,10 +315,10 @@ class InMemoryCatalog(
   override def alterTableStats(
       db: String,
       table: String,
-      stats: CatalogStatistics): Unit = synchronized {
+      stats: Option[CatalogStatistics]): Unit = synchronized {
     requireTableExists(db, table)
     val origTable = catalog(db).tables(table).table
-    catalog(db).tables(table).table = origTable.copy(stats = Some(stats))
+    catalog(db).tables(table).table = origTable.copy(stats = stats)
   }
 
   override def getTable(db: String, table: String): CatalogTable = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index cf02da8..7ece77d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -380,7 +380,7 @@ class SessionCatalog(
    * Alter Spark's statistics of an existing metastore table identified by the provided table
    * identifier.
    */
-  def alterTableStats(identifier: TableIdentifier, newStats: CatalogStatistics): Unit = {
+  def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]):
Unit = {
     val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(identifier.table)
     val tableIdentifier = TableIdentifier(table, Some(db))

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 557b097..c22d55f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -260,7 +260,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val oldTableStats = catalog.getTable("db2", "tbl1").stats
     assert(oldTableStats.isEmpty)
     val newStats = CatalogStatistics(sizeInBytes = 1)
-    catalog.alterTableStats("db2", "tbl1", newStats)
+    catalog.alterTableStats("db2", "tbl1", Some(newStats))
     val newTableStats = catalog.getTable("db2", "tbl1").stats
     assert(newTableStats.get == newStats)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index a6dc21b..fc3893e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -454,7 +454,7 @@ abstract class SessionCatalogSuite extends AnalysisTest {
       val oldTableStats = catalog.getTableMetadata(tableId).stats
       assert(oldTableStats.isEmpty)
       val newStats = CatalogStatistics(sizeInBytes = 1)
-      catalog.alterTableStats(tableId, newStats)
+      catalog.alterTableStats(tableId, Some(newStats))
       val newTableStats = catalog.getTableMetadata(tableId).stats
       assert(newTableStats.get == newStats)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 2f273b6..6588993 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -42,7 +42,7 @@ case class AnalyzeColumnCommand(
     if (tableMeta.tableType == CatalogTableType.VIEW) {
       throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
-    val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
+    val sizeInBytes = CommandUtils.calculateTotalSize(sessionState, tableMeta)
 
     // Compute stats for each column
     val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
@@ -54,7 +54,7 @@ case class AnalyzeColumnCommand(
       // Newly computed column stats should override the existing ones.
       colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
 
-    sessionState.catalog.alterTableStats(tableIdentWithDB, statistics)
+    sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
 
     // Refresh the cached data source table in the catalog.
     sessionState.catalog.refreshTable(tableIdentWithDB)

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 13b8faf..d780ef4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -17,18 +17,10 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.net.URI
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.internal.SessionState
 
 
 /**
@@ -46,7 +38,7 @@ case class AnalyzeTableCommand(
     if (tableMeta.tableType == CatalogTableType.VIEW) {
       throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
-    val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
+    val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
 
     val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
     val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
@@ -74,7 +66,7 @@ case class AnalyzeTableCommand(
     // Update the metastore if the above statistics of the table are different from those
     // recorded in the metastore.
     if (newStats.isDefined) {
-      sessionState.catalog.alterTableStats(tableIdentWithDB, newStats.get)
+      sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
       // Refresh the cached data source table in the catalog.
       sessionState.catalog.refreshTable(tableIdentWithDB)
     }
@@ -82,65 +74,3 @@ case class AnalyzeTableCommand(
     Seq.empty[Row]
   }
 }
-
-object AnalyzeTableCommand extends Logging {
-
-  def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): Long =
{
-    if (catalogTable.partitionColumnNames.isEmpty) {
-      calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
-    } else {
-      // Calculate table size as a sum of the visible partitions. See SPARK-21079
-      val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
-      partitions.map(p =>
-        calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
-      ).sum
-    }
-  }
-
-  private def calculateLocationSize(
-      sessionState: SessionState,
-      tableId: TableIdentifier,
-      locationUri: Option[URI]): Long = {
-    // This method is mainly based on
-    // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
-    // in Hive 0.13 (except that we do not use fs.getContentSummary).
-    // TODO: Generalize statistics collection.
-    // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
-    // Can we use fs.getContentSummary in future?
-    // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
-    // countFileSize to count the table size.
-    val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
-
-    def calculateLocationSize(fs: FileSystem, path: Path): Long = {
-      val fileStatus = fs.getFileStatus(path)
-      val size = if (fileStatus.isDirectory) {
-        fs.listStatus(path)
-          .map { status =>
-            if (!status.getPath.getName.startsWith(stagingDir)) {
-              calculateLocationSize(fs, status.getPath)
-            } else {
-              0L
-            }
-          }.sum
-      } else {
-        fileStatus.getLen
-      }
-
-      size
-    }
-
-    locationUri.map { p =>
-      val path = new Path(p)
-      try {
-        val fs = path.getFileSystem(sessionState.newHadoopConf())
-        calculateLocationSize(fs, path)
-      } catch {
-        case NonFatal(e) =>
-          logWarning(
-            s"Failed to get the size of table ${tableId.table} in the " +
-              s"database ${tableId.database} because of ${e.toString}", e)
-          0L
-      }
-    }.getOrElse(0L)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
new file mode 100644
index 0000000..9239760
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -0,0 +1,102 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command
+
+import java.net.URI
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.internal.SessionState
+
+
+object CommandUtils extends Logging {
+
+  /** Change statistics after changing data by commands. */
+  def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
+    if (table.stats.nonEmpty) {
+      val catalog = sparkSession.sessionState.catalog
+      catalog.alterTableStats(table.identifier, None)
+    }
+  }
+
+  def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt
= {
+    if (catalogTable.partitionColumnNames.isEmpty) {
+      calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
+    } else {
+      // Calculate table size as a sum of the visible partitions. See SPARK-21079
+      val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
+      partitions.map { p =>
+        calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
+      }.sum
+    }
+  }
+
+  def calculateLocationSize(
+      sessionState: SessionState,
+      identifier: TableIdentifier,
+      locationUri: Option[URI]): Long = {
+    // This method is mainly based on
+    // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+    // in Hive 0.13 (except that we do not use fs.getContentSummary).
+    // TODO: Generalize statistics collection.
+    // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+    // Can we use fs.getContentSummary in future?
+    // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+    // countFileSize to count the table size.
+    val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
+    def getPathSize(fs: FileSystem, path: Path): Long = {
+      val fileStatus = fs.getFileStatus(path)
+      val size = if (fileStatus.isDirectory) {
+        fs.listStatus(path)
+          .map { status =>
+            if (!status.getPath.getName.startsWith(stagingDir)) {
+              getPathSize(fs, status.getPath)
+            } else {
+              0L
+            }
+          }.sum
+      } else {
+        fileStatus.getLen
+      }
+
+      size
+    }
+
+    locationUri.map { p =>
+      val path = new Path(p)
+      try {
+        val fs = path.getFileSystem(sessionState.newHadoopConf())
+        getPathSize(fs, path)
+      } catch {
+        case NonFatal(e) =>
+          logWarning(
+            s"Failed to get the size of table ${identifier.table} in the " +
+              s"database ${identifier.database} because of ${e.toString}", e)
+          0L
+      }
+    }.getOrElse(0L)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 413f5f3..ac897c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -433,9 +433,11 @@ case class AlterTableAddPartitionCommand(
         sparkSession.sessionState.conf.resolver)
       // inherit table storage format (possibly except for location)
       CatalogTablePartition(normalizedSpec, table.storage.copy(
-        locationUri = location.map(CatalogUtils.stringToURI(_))))
+        locationUri = location.map(CatalogUtils.stringToURI)))
     }
     catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
+
+    CommandUtils.updateTableStats(sparkSession, table)
     Seq.empty[Row]
   }
 
@@ -519,6 +521,9 @@ case class AlterTableDropPartitionCommand(
     catalog.dropPartitions(
       table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
       retainData = retainData)
+
+    CommandUtils.updateTableStats(sparkSession, table)
+
     Seq.empty[Row]
   }
 
@@ -768,6 +773,8 @@ case class AlterTableSetLocationCommand(
         // No partition spec is specified, so we set the location for the table itself
         catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
     }
+
+    CommandUtils.updateTableStats(sparkSession, table)
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index b937a8a..8ded106 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -400,6 +400,7 @@ case class LoadDataCommand(
     // Refresh the metadata cache to ensure the data visible to the users
     catalog.refreshTable(targetTable.identifier)
 
+    CommandUtils.updateTableStats(sparkSession, targetTable)
     Seq.empty[Row]
   }
 }
@@ -487,6 +488,12 @@ case class TruncateTableCommand(
       case NonFatal(e) =>
         log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e)
     }
+
+    if (table.stats.nonEmpty) {
+      // empty table after truncation
+      val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
+      catalog.alterTableStats(tableName, Some(newStats))
+    }
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 00aa124..ab26f2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -161,6 +161,11 @@ case class InsertIntoHadoopFsRelationCommand(
       fileIndex.foreach(_.refresh())
       // refresh data cache if table is cached
       sparkSession.catalog.refreshByPath(outputPath.toString)
+
+      if (catalogTable.nonEmpty) {
+        CommandUtils.updateTableStats(sparkSession, catalogTable.get)
+      }
+
     } else {
       logInfo("Skipping insertion into a relation that already exists.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 9824062..b031c52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -40,17 +40,6 @@ import org.apache.spark.sql.types._
 class StatisticsCollectionSuite extends StatisticsCollectionTestBase with SharedSQLContext
{
   import testImplicits._
 
-  private def checkTableStats(tableName: String, expectedRowCount: Option[Int])
-    : Option[CatalogStatistics] = {
-    val df = spark.table(tableName)
-    val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
-      assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
-      rel.catalogTable.get.stats
-    }
-    assert(stats.size == 1)
-    stats.head
-  }
-
   test("estimates the size of a limit 0 on outer join") {
     withTempView("test") {
       Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
@@ -96,11 +85,11 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with
Shared
 
       // noscan won't count the number of rows
       sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
-      checkTableStats(tableName, expectedRowCount = None)
+      checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None)
 
       // without noscan, we count the number of rows
       sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
-      checkTableStats(tableName, expectedRowCount = Some(2))
+      checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = Some(2))
     }
   }
 
@@ -168,6 +157,60 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase
with Shared
       assert(stats.simpleString == expectedString)
     }
   }
+
+  test("change stats after truncate command") {
+    val table = "change_stats_truncate_table"
+    withTable(table) {
+      spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table)
+      // analyze to get initial stats
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
+      val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(100))
+      assert(fetched1.get.sizeInBytes > 0)
+      assert(fetched1.get.colStats.size == 2)
+
+      // truncate table command
+      sql(s"TRUNCATE TABLE $table")
+      val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetched2.get.sizeInBytes == 0)
+      assert(fetched2.get.colStats.isEmpty)
+    }
+  }
+
+  test("change stats after set location command") {
+    val table = "change_stats_set_location_table"
+    withTable(table) {
+      spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table)
+      // analyze to get initial stats
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
+      val fetched1 = checkTableStats(
+        table, hasSizeInBytes = true, expectedRowCounts = Some(100))
+      assert(fetched1.get.sizeInBytes > 0)
+      assert(fetched1.get.colStats.size == 2)
+
+      // set location command
+      withTempDir { newLocation =>
+        sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'")
+        checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+      }
+    }
+  }
+
+  test("change stats after insert command for datasource table") {
+    val table = "change_stats_insert_datasource_table"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
+      // analyze to get initial stats
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+      val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetched1.get.sizeInBytes == 0)
+      assert(fetched1.get.colStats.size == 2)
+
+      // insert into command
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
+      checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+    }
+  }
+
 }
 
 
@@ -219,6 +262,22 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
 
   private val randomName = new Random(31)
 
+  def checkTableStats(
+      tableName: String,
+      hasSizeInBytes: Boolean,
+      expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
+    val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats
+    if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
+      assert(stats.isDefined)
+      assert(stats.get.sizeInBytes >= 0)
+      assert(stats.get.rowCount === expectedRowCounts)
+    } else {
+      assert(stats.isEmpty)
+    }
+
+    stats
+  }
+
   /**
    * Compute column stats for the given DataFrame and compare it with colStats.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index f6d4773..d74a7cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -149,6 +149,7 @@ private[sql] trait SQLTestUtils
         .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
     }
   }
+
   /**
    * Creates a temporary directory, which is then passed to `f` and will be deleted after
`f`
    * returns.
@@ -165,6 +166,19 @@ private[sql] trait SQLTestUtils
   }
 
   /**
+   * Creates the specified number of temporary directories, which is then passed to `f` and
will be
+   * deleted after `f` returns.
+   */
+  protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = {
+    val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile)
+    try f(files) finally {
+      // wait for all tasks to finish before deleting files
+      waitForTasksToFinish()
+      files.foreach(Utils.deleteRecursively)
+    }
+  }
+
+  /**
    * Drops functions after calling `f`. A function is represented by (functionName, isTemporary).
    */
   protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 6e7c475..2a17849 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -631,21 +631,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
   override def alterTableStats(
       db: String,
       table: String,
-      stats: CatalogStatistics): Unit = withClient {
+      stats: Option[CatalogStatistics]): Unit = withClient {
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
 
     // convert table statistics to properties so that we can persist them through hive client
-    var statsProperties: Map[String, String] =
-      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
-    if (stats.rowCount.isDefined) {
-      statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
-    }
-    val colNameTypeMap: Map[String, DataType] =
-      rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap
-    stats.colStats.foreach { case (colName, colStat) =>
-      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-        statsProperties += (columnStatKeyPropName(colName, k) -> v)
+    val statsProperties = new mutable.HashMap[String, String]()
+    if (stats.isDefined) {
+      statsProperties += STATISTICS_TOTAL_SIZE -> stats.get.sizeInBytes.toString()
+      if (stats.get.rowCount.isDefined) {
+        statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString()
+      }
+      val colNameTypeMap: Map[String, DataType] =
+        rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap
+      stats.get.colStats.foreach { case (colName, colStat) =>
+        colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
+          statsProperties += (columnStatKeyPropName(colName, k) -> v)
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 392b7cf..223d375 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.{CommandUtils, RunnableCommand}
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
@@ -434,6 +434,8 @@ case class InsertIntoHiveTable(
     sparkSession.catalog.uncacheTable(table.identifier.quotedString)
     sparkSession.sessionState.catalog.refreshTable(table.identifier)
 
+    CommandUtils.updateTableStats(sparkSession, table)
+
     // It would be nice to just return the childRdd unchanged so insert operations could
be chained,
     // however for now we return an empty list to simplify compatibility checks with hive,
which
     // does not return anything for insert operations.

http://git-wip-us.apache.org/repos/asf/spark/blob/82e24912/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 64deb38..5fd266c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
+
 class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
 
   test("Hive serde tables should fallback to HDFS for size estimation") {
@@ -219,23 +221,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
-  private def checkTableStats(
-      tableName: String,
-      hasSizeInBytes: Boolean,
-      expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
-    val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats
-
-    if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
-      assert(stats.isDefined)
-      assert(stats.get.sizeInBytes > 0)
-      assert(stats.get.rowCount === expectedRowCounts)
-    } else {
-      assert(stats.isEmpty)
-    }
-
-    stats
-  }
-
   test("test table-level statistics for hive tables created in HiveExternalCatalog") {
     val textTable = "textTable"
     withTable(textTable) {
@@ -326,7 +311,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       descOutput: Seq[String],
       propKey: String): Option[BigInt] = {
     val str = descOutput
-      .filterNot(_.contains(HiveExternalCatalog.STATISTICS_PREFIX))
+      .filterNot(_.contains(STATISTICS_PREFIX))
       .filter(_.contains(propKey))
     if (str.isEmpty) {
       None
@@ -448,6 +433,103 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       "ALTER TABLE unset_prop_table UNSET TBLPROPERTIES ('prop1')")
   }
 
+  /**
+   * To see if stats exist, we need to check spark's stats properties instead of catalog
+   * statistics, because hive would change stats in metastore and thus change catalog statistics.
+   */
+  private def getStatsProperties(tableName: String): Map[String, String] = {
+    val hTable = hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName)
+    hTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+  }
+
+  test("change stats after insert command for hive table") {
+    val table = s"change_stats_insert_hive_table"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (i int, j string)")
+      // analyze to get initial stats
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+      val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetched1.get.sizeInBytes == 0)
+      assert(fetched1.get.colStats.size == 2)
+
+      // insert into command
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
+      assert(getStatsProperties(table).isEmpty)
+    }
+  }
+
+  test("change stats after load data command") {
+    val table = "change_stats_load_table"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
+      // analyze to get initial stats
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+      val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetched1.get.sizeInBytes == 0)
+      assert(fetched1.get.colStats.size == 2)
+
+      withTempDir { loadPath =>
+        // load data command
+        val file = new File(loadPath + "/data")
+        val writer = new PrintWriter(file)
+        writer.write("2,xyz")
+        writer.close()
+        sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
+        assert(getStatsProperties(table).isEmpty)
+      }
+    }
+  }
+
+  test("change stats after add/drop partition command") {
+    val table = "change_stats_part_table"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)")
+      // table has two partitions initially
+      for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
+        sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'")
+      }
+      // analyze to get initial stats
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+      val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
+      assert(fetched1.get.sizeInBytes > 0)
+      assert(fetched1.get.colStats.size == 2)
+
+      withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
+        val file1 = new File(dir1 + "/data")
+        val writer1 = new PrintWriter(file1)
+        writer1.write("1,a")
+        writer1.close()
+
+        val file2 = new File(dir2 + "/data")
+        val writer2 = new PrintWriter(file2)
+        writer2.write("1,a")
+        writer2.close()
+
+        // add partition command
+        sql(
+          s"""
+             |ALTER TABLE $table ADD
+             |PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}'
+             |PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}'
+        """.stripMargin)
+        assert(getStatsProperties(table).isEmpty)
+
+        // generate stats again
+        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+        val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts =
Some(4))
+        assert(fetched2.get.sizeInBytes > 0)
+        assert(fetched2.get.colStats.size == 2)
+
+        // drop partition command
+        sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')")
+        // only one partition left
+        assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
+          .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
+        assert(getStatsProperties(table).isEmpty)
+      }
+    }
+  }
+
   test("add/drop partitions - managed table") {
     val catalog = spark.sessionState.catalog
     val managedTable = "partitionedTable"
@@ -483,23 +565,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       assert(catalog.listPartitions(TableIdentifier(managedTable)).map(_.spec).toSet ==
         Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
 
-      val stats2 = checkTableStats(
-        managedTable, hasSizeInBytes = true, expectedRowCounts = Some(4))
-      assert(stats1 == stats2)
-
       sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS")
 
-      val stats3 = checkTableStats(
+      val stats2 = checkTableStats(
         managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1))
-      assert(stats2.get.sizeInBytes > stats3.get.sizeInBytes)
+      assert(stats1.get.sizeInBytes > stats2.get.sizeInBytes)
 
       sql(s"ALTER TABLE $managedTable ADD PARTITION (ds='2008-04-08', hr='12')")
       sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS")
       val stats4 = checkTableStats(
         managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1))
 
-      assert(stats2.get.sizeInBytes > stats4.get.sizeInBytes)
-      assert(stats4.get.sizeInBytes == stats3.get.sizeInBytes)
+      assert(stats1.get.sizeInBytes > stats4.get.sizeInBytes)
+      assert(stats4.get.sizeInBytes == stats2.get.sizeInBytes)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message