spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-15365][SQL] When table size statistics are not available from metastore, we should fallback to HDFS
Date Wed, 25 May 2016 03:58:36 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5504f60e8 -> e13cfd6d2


[SPARK-15365][SQL] When table size statistics are not available from metastore, we should
fallback to HDFS

## What changes were proposed in this pull request?
Currently if a table is used in join operation we rely on Metastore returned size to calculate
if we can convert the operation to Broadcast join. This optimization only kicks in for table's
that have the statistics available in metastore. Hive generally rolls over to HDFS if the
statistics are not available directly from metastore and this seems like a reasonable choice
to adopt given the optimization benefit of using broadcast joins.

## How was this patch tested?
I have executed queries locally to test.

Author: Parth Brahmbhatt <pbrahmbhatt@netflix.com>

Closes #13150 from Parth-Brahmbhatt/SPARK-15365.

(cherry picked from commit 4acababcaba567c85f3be0d5e939d99119b82d1d)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e13cfd6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e13cfd6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e13cfd6d

Branch: refs/heads/branch-2.0
Commit: e13cfd6d265cd47365cda1a0452133434e0515df
Parents: 5504f60
Author: Parth Brahmbhatt <pbrahmbhatt@netflix.com>
Authored: Tue May 24 20:58:20 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue May 24 20:58:32 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++
 .../spark/sql/hive/MetastoreRelation.scala      | 33 +++++++++----
 .../apache/spark/sql/hive/StatisticsSuite.scala | 50 +++++++++++++++++++-
 3 files changed, 83 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e13cfd6d/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b91518a..4efefda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,6 +113,13 @@ object SQLConf {
     .longConf
     .createWithDefault(10L * 1024 * 1024)
 
+  val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
+    SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
+    .doc("If the table statistics are not available from table metadata enable fall back
to hdfs." +
+      " This is useful in determining if a table is small enough to use auto broadcast joins.")
+    .booleanConf
+    .createWithDefault(false)
+
   val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
     .internal()
     .doc("The default table size used in query planning. By default, it is set to a larger
" +
@@ -603,6 +610,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
 
   def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
+  def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
+
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
 
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/e13cfd6d/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 1671228..9c82014 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
+
 import scala.collection.JavaConverters._
 
 import com.google.common.base.Objects
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
       val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
       // TODO: check if this estimate is valid for tables after partition pruning.
       // NOTE: getting `totalSize` directly from params is kind of hacky, but this should
be
-      // relatively cheap if parameters for the table are populated into the metastore. 
An
-      // alternative would be going through Hadoop's FileSystem API, which can be expensive
if a lot
-      // of RPCs are involved.  Besides `totalSize`, there are also `numFiles`, `numRows`,
-      // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
+      // relatively cheap if parameters for the table are populated into the metastore.
+      // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+      // (see StatsSetupConst in Hive) that we can look at in the future.
       BigInt(
         // When table is external,`totalSize` is always zero, which will influence join strategy
         // so when `totalSize` is zero, use `rawDataSize` instead
-        // if the size is still less than zero, we use default size
-        Option(totalSize).map(_.toLong).filter(_ > 0)
-          .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
-            .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
+        // if the size is still less than zero, we try to get the file size from HDFS.
+        // given this is only needed for optimization, if the HDFS call fails we return the
default.
+        if (totalSize != null && totalSize.toLong > 0L) {
+          totalSize.toLong
+        } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+          rawDataSize.toLong
+        } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+          try {
+            val hadoopConf = sparkSession.sessionState.newHadoopConf()
+            val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+            fs.getContentSummary(hiveQlTable.getPath).getLength
+          } catch {
+            case e: IOException =>
+              logWarning("Failed to get table size from hdfs.", e)
+              sparkSession.sessionState.conf.defaultSizeInBytes
+          }
+        } else {
+          sparkSession.sessionState.conf.defaultSizeInBytes
+        })
     }
   )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e13cfd6d/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1a7b6c0..f8e00a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.{File, PrintWriter}
+
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
@@ -25,8 +27,9 @@ import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
 
-class StatisticsSuite extends QueryTest with TestHiveSingleton {
+class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   import hiveContext.sql
 
   test("parse analyze commands") {
@@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
       classOf[AnalyzeTableCommand])
   }
 
+  test("MetastoreRelations fallback to HDFS for size estimation") {
+    val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled
+    try {
+      withTempDir { tempDir =>
+
+        // EXTERNAL OpenCSVSerde table pointing to LOCATION
+
+        val file1 = new File(tempDir + "/data1")
+        val writer1 = new PrintWriter(file1)
+        writer1.write("1,2")
+        writer1.close()
+
+        val file2 = new File(tempDir + "/data2")
+        val writer2 = new PrintWriter(file2)
+        writer2.write("1,2")
+        writer2.close()
+
+        sql(
+          s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+            WITH SERDEPROPERTIES (
+              \"separatorChar\" = \",\",
+              \"quoteChar\"     = \"\\\"\",
+              \"escapeChar\"    = \"\\\\\")
+            LOCATION '$tempDir'
+          """)
+
+        hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)
+
+        val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+          .asInstanceOf[MetastoreRelation]
+
+        val properties = relation.hiveQlTable.getParameters
+        assert(properties.get("totalSize").toLong <= 0, "external table totalSize must
be <= 0")
+        assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize
must be <= 0")
+
+        val sizeInBytes = relation.statistics.sizeInBytes
+        assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+      }
+    } finally {
+      hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats)
+      sql("DROP TABLE csv_table ")
+    }
+  }
+
   ignore("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
       hiveContext.sessionState.catalog.lookupRelation(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message