spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-18545][SQL] Verify number of hive client RPCs in PartitionedTablePerfStatsSuite
Date Wed, 23 Nov 2016 12:20:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ebeb05140 -> 539c193af


[SPARK-18545][SQL] Verify number of hive client RPCs in PartitionedTablePerfStatsSuite

## What changes were proposed in this pull request?

This would help catch accidental O(n) calls to the hive client as in https://issues.apache.org/jira/browse/SPARK-18507

## How was this patch tested?

Checked that the test fails before https://issues.apache.org/jira/browse/SPARK-18507 was patched.
cc cloud-fan

Author: Eric Liang <ekl@databricks.com>

Closes #15985 from ericl/spark-18545.

(cherry picked from commit 85235ed6c600270e3fa434738bd50dce3564440a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/539c193a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/539c193a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/539c193a

Branch: refs/heads/branch-2.1
Commit: 539c193af7e3e08e9b48df15e94eafcc3532105c
Parents: ebeb051
Author: Eric Liang <ekl@databricks.com>
Authored: Wed Nov 23 20:14:08 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Nov 23 20:20:06 2016 +0800

----------------------------------------------------------------------
 .../spark/metrics/source/StaticSources.scala    |  7 +++
 .../spark/sql/hive/client/HiveClientImpl.scala  |  1 +
 .../hive/PartitionedTablePerfStatsSuite.scala   | 58 +++++++++++++++++++-
 3 files changed, 64 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/539c193a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index 3f7cfd9..b433cd0 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -86,16 +86,23 @@ object HiveCatalogMetrics extends Source {
   val METRIC_FILE_CACHE_HITS = metricRegistry.counter(MetricRegistry.name("fileCacheHits"))
 
   /**
+   * Tracks the total number of Hive client calls (e.g. to lookup a table).
+   */
+  val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
+
+  /**
    * Resets the values of all metrics to zero. This is useful in tests.
    */
   def reset(): Unit = {
     METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
     METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
     METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
+    METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
   }
 
   // clients can use these to avoid classloader issues with the codahale classes
   def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
   def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
   def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
+  def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/539c193a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index daae852..68dcfd8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -281,6 +281,7 @@ private[hive] class HiveClientImpl(
     shim.setCurrentSessionState(state)
     val ret = try f finally {
       Thread.currentThread().setContextClassLoader(original)
+      HiveCatalogMetrics.incrementHiveClientCalls(1)
     }
     ret
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/539c193a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index b41bc86..9838b9a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -57,7 +57,11 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedHiveTable(tableName: String, dir: File): Unit = {
-    spark.range(5).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
+    setupPartitionedHiveTable(tableName, dir, 5)
+  }
+
+  private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit =
{
+    spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
@@ -71,7 +75,11 @@ class PartitionedTablePerfStatsSuite
   }
 
   private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
-    spark.range(5).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
+    setupPartitionedDatasourceTable(tableName, dir, 5)
+  }
+
+  private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int):
Unit = {
+    spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
@@ -242,6 +250,52 @@ class PartitionedTablePerfStatsSuite
     }
   }
 
+  test("hive table: num hive client calls does not scale with partition count") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedHiveTable("test", dir, scale = 100)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 = 1").count() == 1)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() > 0)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 100)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("show partitions test").count() == 100)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10)
+        }
+      }
+    }
+  }
+
+  test("datasource table: num hive client calls does not scale with partition count") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable("test", dir, scale = 100)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 = 1").count() == 1)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() > 0)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 100)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("show partitions test").count() == 100)
+          assert(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount() < 10)
+        }
+      }
+    }
+  }
+
   test("hive table: files read and cached when filesource partition management is off") {
     withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
       withTable("test") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message