spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject [1/3] spark git commit: [SPARK-23445] ColumnStat refactoring
Date Tue, 27 Feb 2018 07:37:36 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7ec83658f -> 8077bb04f


http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1ee1d57..28c340a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -663,14 +663,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
 
-    // For datasource tables and hive serde tables created by spark 2.1 or higher,
-    // the data schema is stored in the table properties.
-    val schema = restoreTableMetadata(rawTable).schema
-
     // convert table statistics to properties so that we can persist them through hive client
     val statsProperties =
       if (stats.isDefined) {
-        statsToProperties(stats.get, schema)
+        statsToProperties(stats.get)
       } else {
         new mutable.HashMap[String, String]()
       }
@@ -1028,9 +1024,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
     currentFullPath
   }
 
-  private def statsToProperties(
-      stats: CatalogStatistics,
-      schema: StructType): Map[String, String] = {
+  private def statsToProperties(stats: CatalogStatistics): Map[String, String] = {
 
     val statsProperties = new mutable.HashMap[String, String]()
     statsProperties += STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()
@@ -1038,11 +1032,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
       statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
     }
 
-    val colNameTypeMap: Map[String, DataType] =
-      schema.fields.map(f => (f.name, f.dataType)).toMap
     stats.colStats.foreach { case (colName, colStat) =>
-      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-        statsProperties += (columnStatKeyPropName(colName, k) -> v)
+      colStat.toMap(colName).foreach { case (k, v) =>
+        // Fully qualified name used in table properties for a particular column stat.
+        // For example, for column "mycol", and "min" stat, this should return
+        // "spark.sql.statistics.colStats.mycol.min".
+        statsProperties += (STATISTICS_COL_STATS_PREFIX + k -> v)
       }
     }
 
@@ -1058,23 +1053,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
     if (statsProps.isEmpty) {
       None
     } else {
+      val colStats = new mutable.HashMap[String, CatalogColumnStat]
+      val colStatsProps = properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map
{
+        case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
+      }
 
-      val colStats = new mutable.HashMap[String, ColumnStat]
-
-      // For each column, recover its column stats. Note that this is currently a O(n^2)
operation,
-      // but given the number of columns it usually not enormous, this is probably OK as
a start.
-      // If we want to map this a linear operation, we'd need a stronger contract between
the
-      // naming convention used for serialization.
-      schema.foreach { field =>
-        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION)))
{
-          // If "version" field is defined, then the column stat is defined.
-          val keyPrefix = columnStatKeyPropName(field.name, "")
-          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k,
v) =>
-            (k.drop(keyPrefix.length), v)
-          }
-          ColumnStat.fromMap(table, field, colStatMap).foreach { cs =>
-            colStats += field.name -> cs
-          }
+      // Find all the column names by matching the KEY_VERSION properties for them.
+      colStatsProps.keys.filter {
+        k => k.endsWith(CatalogColumnStat.KEY_VERSION)
+      }.map { k =>
+        k.dropRight(CatalogColumnStat.KEY_VERSION.length + 1)
+      }.foreach { fieldName =>
+        // and for each, create a column stat.
+        CatalogColumnStat.fromMap(table, fieldName, colStatsProps).foreach { cs =>
+          colStats += fieldName -> cs
         }
       }
 
@@ -1093,14 +1085,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
 
     val rawTable = getRawTable(db, table)
 
-    // For datasource tables and hive serde tables created by spark 2.1 or higher,
-    // the data schema is stored in the table properties.
-    val schema = restoreTableMetadata(rawTable).schema
-
     // convert partition statistics to properties so that we can persist them through hive
api
     val withStatsProps = lowerCasedParts.map { p =>
       if (p.stats.isDefined) {
-        val statsProperties = statsToProperties(p.stats.get, schema)
+        val statsProperties = statsToProperties(p.stats.get)
         p.copy(parameters = p.parameters ++ statsProperties)
       } else {
         p
@@ -1310,15 +1298,6 @@ object HiveExternalCatalog {
   val EMPTY_DATA_SCHEMA = new StructType()
     .add("col", "array<string>", nullable = true, comment = "from deserializer")
 
-  /**
-   * Returns the fully qualified name used in table properties for a particular column stat.
-   * For example, for column "mycol", and "min" stat, this should return
-   * "spark.sql.statistics.colStats.mycol.min".
-   */
-  private def columnStatKeyPropName(columnName: String, statKey: String): String = {
-    STATISTICS_COL_STATS_PREFIX + columnName + "." + statKey
-  }
-
   // A persisted data source table always store its schema in the catalog.
   private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
     val errorMessage = "Could not read schema from the hive metastore because it is corrupted."

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 3af8af0..61cec82 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, HistogramSerializer}
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils}
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -177,8 +177,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats0 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
       assert(fetchedStats0.get.colStats == Map(
-        "a" -> ColumnStat(2, Some(1), Some(2), 0, 4, 4),
-        "b" -> ColumnStat(1, Some(1), Some(1), 0, 4, 4)))
+        "a" -> CatalogColumnStat(Some(2), Some("1"), Some("2"), Some(0), Some(4), Some(4)),
+        "b" -> CatalogColumnStat(Some(1), Some("1"), Some("1"), Some(0), Some(4), Some(4))))
     }
   }
 
@@ -208,8 +208,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats1 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
       assert(fetchedStats1.colStats == Map(
-        "C1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount
= 0,
-          avgLen = 4, maxLen = 4)))
+        "C1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))))
     }
   }
 
@@ -596,7 +596,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
       val fetchedStats0 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
-      assert(fetchedStats0.get.colStats == Map("c1" -> ColumnStat(0, None, None, 0, 4,
4)))
+      assert(fetchedStats0.get.colStats ==
+        Map("c1" -> CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))))
 
       // Insert new data and analyze: have the latest column stats.
       sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
@@ -604,18 +605,18 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats1 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
       assert(fetchedStats1.colStats == Map(
-        "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount
= 0,
-          avgLen = 4, maxLen = 4)))
+        "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))))
 
       // Analyze another column: since the table is not changed, the precious column stats
are kept.
       sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
       val fetchedStats2 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
       assert(fetchedStats2.colStats == Map(
-        "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount
= 0,
-          avgLen = 4, maxLen = 4),
-        "c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0,
-          avgLen = 1, maxLen = 1)))
+        "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        "c2" -> CatalogColumnStat(distinctCount = Some(1), min = None, max = None,
+          nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))))
 
       // Insert new data and analyze: stale column stats are removed and newly collected
column
       // stats are added.
@@ -624,10 +625,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats3 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get
       assert(fetchedStats3.colStats == Map(
-        "c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount
= 0,
-          avgLen = 4, maxLen = 4),
-        "c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount
= 0,
-          avgLen = 8, maxLen = 8)))
+        "c1" -> CatalogColumnStat(distinctCount = Some(2), min = Some("1"), max = Some("2"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        "c3" -> CatalogColumnStat(distinctCount = Some(2), min = Some("10.0"), max = Some("20.0"),
+          nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))))
     }
   }
 
@@ -999,115 +1000,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   test("verify serialized column stats after analyzing columns") {
     import testImplicits._
 
-    val tableName = "column_stats_test2"
+    val tableName = "column_stats_test_ser"
     // (data.head.productArity - 1) because the last column does not support stats collection.
     assert(stats.size == data.head.productArity - 1)
     val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
 
-    val expectedSerializedColStats = Map(
-      "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
-      "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
-      "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cbinary.version" -> "1",
-      "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
-      "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cbool.max" -> "true",
-      "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
-      "spark.sql.statistics.colStats.cbool.min" -> "false",
-      "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cbool.version" -> "1",
-      "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
-      "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cbyte.max" -> "2",
-      "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
-      "spark.sql.statistics.colStats.cbyte.min" -> "1",
-      "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cbyte.version" -> "1",
-      "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
-      "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
-      "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
-      "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
-      "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cdate.version" -> "1",
-      "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
-      "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
-      "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
-      "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
-      "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cdecimal.version" -> "1",
-      "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
-      "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
-      "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
-      "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
-      "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cdouble.version" -> "1",
-      "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
-      "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
-      "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
-      "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
-      "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cfloat.version" -> "1",
-      "spark.sql.statistics.colStats.cint.avgLen" -> "4",
-      "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cint.max" -> "4",
-      "spark.sql.statistics.colStats.cint.maxLen" -> "4",
-      "spark.sql.statistics.colStats.cint.min" -> "1",
-      "spark.sql.statistics.colStats.cint.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cint.version" -> "1",
-      "spark.sql.statistics.colStats.clong.avgLen" -> "8",
-      "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.clong.max" -> "5",
-      "spark.sql.statistics.colStats.clong.maxLen" -> "8",
-      "spark.sql.statistics.colStats.clong.min" -> "1",
-      "spark.sql.statistics.colStats.clong.nullCount" -> "1",
-      "spark.sql.statistics.colStats.clong.version" -> "1",
-      "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
-      "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cshort.max" -> "3",
-      "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
-      "spark.sql.statistics.colStats.cshort.min" -> "1",
-      "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cshort.version" -> "1",
-      "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
-      "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
-      "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cstring.version" -> "1",
-      "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
-      "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0",
-      "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
-      "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0",
-      "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
-      "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
-    )
-
-    val expectedSerializedHistograms = Map(
-      "spark.sql.statistics.colStats.cbyte.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cbyte").histogram.get),
-      "spark.sql.statistics.colStats.cshort.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cshort").histogram.get),
-      "spark.sql.statistics.colStats.cint.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cint").histogram.get),
-      "spark.sql.statistics.colStats.clong.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("clong").histogram.get),
-      "spark.sql.statistics.colStats.cdouble.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cdouble").histogram.get),
-      "spark.sql.statistics.colStats.cfloat.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cfloat").histogram.get),
-      "spark.sql.statistics.colStats.cdecimal.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cdecimal").histogram.get),
-      "spark.sql.statistics.colStats.cdate.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
-      "spark.sql.statistics.colStats.ctimestamp.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
-    )
-
     def checkColStatsProps(expected: Map[String, String]): Unit = {
       sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + stats.keys.mkString(",
"))
       val table = hiveClient.getTable("default", tableName)
@@ -1129,6 +1026,29 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("verify column stats can be deserialized from tblproperties") {
+    import testImplicits._
+
+    val tableName = "column_stats_test_de"
+    // (data.head.productArity - 1) because the last column does not support stats collection.
+    assert(stats.size == data.head.productArity - 1)
+    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+
+    withTable(tableName) {
+      df.write.saveAsTable(tableName)
+
+      // Put in stats properties manually.
+      val table = getCatalogTable(tableName)
+      val newTable = table.copy(
+        properties = table.properties ++
+          expectedSerializedColStats ++ expectedSerializedHistograms +
+          ("spark.sql.statistics.totalSize" -> "1") /* totalSize always required */)
+      hiveClient.alterTable(newTable)
+
+      validateColStats(tableName, statsWithHgms)
+    }
+  }
+
   test("serialization and deserialization of histograms to/from hive metastore") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message