spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-15959][SQL] Add the support of hive.metastore.warehouse.dir back
Date Wed, 15 Jun 2016 18:50:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9a5071996 -> e1585cc74


[SPARK-15959][SQL] Add the support of hive.metastore.warehouse.dir back

## What changes were proposed in this pull request?
This PR adds the support of conf `hive.metastore.warehouse.dir` back. With this patch, the
way of setting the warehouse dir is described as follows:
* If `spark.sql.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically
set to the value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the
value of `spark.sql.warehouse.dir`.
* If `spark.sql.warehouse.dir` is not set but `hive.metastore.warehouse.dir` is set, `spark.sql.warehouse.dir`
will be automatically set to the value of `hive.metastore.warehouse.dir`. The warehouse dir
is effectively set to the value of `hive.metastore.warehouse.dir`.
* If neither `spark.sql.warehouse.dir` nor `hive.metastore.warehouse.dir` is set, `hive.metastore.warehouse.dir`
will be automatically set to the default value of `spark.sql.warehouse.dir`. The warehouse
dir is effectively set to the default value of `spark.sql.warehouse.dir`.

## How was this patch tested?
`set hive.metastore.warehouse.dir` in `HiveSparkSubmitSuite`.

JIRA: https://issues.apache.org/jira/browse/SPARK-15959

Author: Yin Huai <yhuai@databricks.com>

Closes #13679 from yhuai/hiveWarehouseDir.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1585cc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1585cc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1585cc7

Branch: refs/heads/master
Commit: e1585cc74853c497271eecdc943c0eabe1aeb4c1
Parents: 9a50719
Author: Yin Huai <yhuai@databricks.com>
Authored: Wed Jun 15 11:50:54 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Jun 15 11:50:54 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/internal/SharedState.scala | 29 ++++++-
 .../apache/spark/sql/hive/HiveSharedState.scala | 13 +--
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   | 91 +++++++++++++++++---
 3 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1585cc7/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index c37f7f1..bc349b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
 import org.apache.spark.sql.execution.CacheManager
@@ -30,7 +31,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
 /**
  * A class that holds all state shared across sessions in a given [[SQLContext]].
  */
-private[sql] class SharedState(val sparkContext: SparkContext) {
+private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
 
   /**
    * Class for caching query results reused in future executions.
@@ -46,7 +47,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
    * The base hadoop configuration which is shared among all spark sessions. It is based
on the
    * default hadoop configuration of Spark, with custom configurations inside `hive-site.xml`.
    */
-  lazy val hadoopConf: Configuration = {
+  val hadoopConf: Configuration = {
     val conf = new Configuration(sparkContext.hadoopConfiguration)
     val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
     if (configFile != null) {
@@ -66,6 +67,30 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
   val jarClassLoader = new NonClosableMutableURLClassLoader(
     org.apache.spark.util.Utils.getContextOrSparkClassLoader)
 
+  {
+    // Set the Hive metastore warehouse path to the one we use
+    val tempConf = new SQLConf
+    sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
+    val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
+    if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key))
{
+      // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
+      // we will respect the value of hive.metastore.warehouse.dir.
+      tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir)
+      sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir)
+      logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir
" +
+        s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " +
+        s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
+    } else {
+      // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir
using
+      // the value of spark.sql.warehouse.dir.
+      // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
+      // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
+      sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath)
+    }
+
+    logInfo(s"Warehouse path is '${tempConf.warehousePath}'.")
+  }
+
   /**
    * Create a SQLListener then add it into SparkContext, and create a SQLTab if there is
SparkUI.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e1585cc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index 78b1ecb..6b7a333 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -18,9 +18,8 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SharedState, SQLConf}
+import org.apache.spark.sql.internal.SharedState
 
 
 /**
@@ -28,18 +27,10 @@ import org.apache.spark.sql.internal.{SharedState, SQLConf}
  * [[org.apache.spark.sql.SparkSession]] backed by Hive.
  */
 private[hive] class HiveSharedState(override val sparkContext: SparkContext)
-  extends SharedState(sparkContext) with Logging {
+  extends SharedState(sparkContext) {
 
   // TODO: just share the IsolatedClientLoader instead of the client instance itself
 
-  {
-    // Set the Hive metastore warehouse path to the one we use
-    val tempConf = new SQLConf
-    sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
-    sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath)
-    logInfo(s"Setting Hive metastore warehouse path to '${tempConf.warehousePath}'")
-  }
-
   /**
    * A Hive client used to interact with the metastore.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e1585cc7/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index d56bede..9bca720 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.File
+import java.io.{BufferedWriter, File, FileWriter}
 import java.sql.Timestamp
 import java.util.Date
 
@@ -205,7 +205,7 @@ class HiveSparkSubmitSuite
     val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
     val args = Seq(
       "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
-      "--name", "SetWarehouseLocationTest",
+      "--name", "SetSparkWarehouseLocationTest",
       "--master", "local-cluster[2,1,1024]",
       "--conf", "spark.ui.enabled=false",
       "--conf", "spark.master.rest.enabled=false",
@@ -214,6 +214,45 @@ class HiveSparkSubmitSuite
     runSparkSubmit(args)
   }
 
+  test("set hive.metastore.warehouse.dir") {
+    // In this test, we set hive.metastore.warehouse.dir in hive-site.xml but
+    // not set spark.sql.warehouse.dir. So, the warehouse dir should be
+    // the value of hive.metastore.warehouse.dir. Also, the value of
+    // spark.sql.warehouse.dir should be set to the value of hive.metastore.warehouse.dir.
+
+    val hiveWarehouseLocation = Utils.createTempDir()
+    hiveWarehouseLocation.delete()
+    val hiveSiteXmlContent =
+      s"""
+         |<configuration>
+         |  <property>
+         |    <name>hive.metastore.warehouse.dir</name>
+         |    <value>$hiveWarehouseLocation</value>
+         |  </property>
+         |</configuration>
+     """.stripMargin
+
+    // Write a hive-site.xml containing a setting of hive.metastore.warehouse.dir.
+    val hiveSiteDir = Utils.createTempDir()
+    val file = new File(hiveSiteDir.getCanonicalPath, "hive-site.xml")
+    val bw = new BufferedWriter(new FileWriter(file))
+    bw.write(hiveSiteXmlContent)
+    bw.close()
+
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val args = Seq(
+      "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
+      "--name", "SetHiveWarehouseLocationTest",
+      "--master", "local-cluster[2,1,1024]",
+      "--conf", "spark.ui.enabled=false",
+      "--conf", "spark.master.rest.enabled=false",
+      "--conf", s"spark.sql.test.expectedWarehouseDir=$hiveWarehouseLocation",
+      "--conf", s"spark.driver.extraClassPath=${hiveSiteDir.getCanonicalPath}",
+      "--driver-java-options", "-Dderby.system.durability=test",
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
   // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -277,19 +316,43 @@ class HiveSparkSubmitSuite
 object SetWarehouseLocationTest extends Logging {
   def main(args: Array[String]): Unit = {
     Utils.configTestLog4j("INFO")
-    val warehouseLocation = Utils.createTempDir()
-    warehouseLocation.delete()
-    val hiveWarehouseLocation = Utils.createTempDir()
-    hiveWarehouseLocation.delete()
 
-    // We will use the value of spark.sql.warehouse.dir override the
-    // value of hive.metastore.warehouse.dir.
-    val sparkSession = SparkSession.builder()
+    val sparkConf = new SparkConf(loadDefaults = true)
+    val builder = SparkSession.builder()
+      .config(sparkConf)
       .config("spark.ui.enabled", "false")
-      .config("spark.sql.warehouse.dir", warehouseLocation.toString)
-      .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
       .enableHiveSupport()
-      .getOrCreate()
+    val providedExpectedWarehouseLocation =
+      sparkConf.getOption("spark.sql.test.expectedWarehouseDir")
+
+    val (sparkSession, expectedWarehouseLocation) = providedExpectedWarehouseLocation match
{
+      case Some(warehouseDir) =>
+        // If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set
+        // through spark-summit. So, neither spark.sql.warehouse.dir nor
+        // hive.metastore.warehouse.dir is set at here.
+        (builder.getOrCreate(), warehouseDir)
+      case None =>
+        val warehouseLocation = Utils.createTempDir()
+        warehouseLocation.delete()
+        val hiveWarehouseLocation = Utils.createTempDir()
+        hiveWarehouseLocation.delete()
+        // If spark.sql.test.expectedWarehouseDir is not set, we will set
+        // spark.sql.warehouse.dir and hive.metastore.warehouse.dir.
+        // We are expecting that the value of spark.sql.warehouse.dir will override the
+        // value of hive.metastore.warehouse.dir.
+        val session = builder
+          .config("spark.sql.warehouse.dir", warehouseLocation.toString)
+          .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
+          .getOrCreate()
+        (session, warehouseLocation.toString)
+
+    }
+
+    if (sparkSession.conf.get("spark.sql.warehouse.dir") != expectedWarehouseLocation) {
+      throw new Exception(
+        "spark.sql.warehouse.dir is not set to the expected warehouse location " +
+        s"$expectedWarehouseLocation.")
+    }
 
     val catalog = sparkSession.sessionState.catalog
 
@@ -301,7 +364,7 @@ object SetWarehouseLocationTest extends Logging {
       val tableMetadata =
         catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
       val expectedLocation =
-        "file:" + warehouseLocation.toString + "/testlocation"
+        "file:" + expectedWarehouseLocation.toString + "/testlocation"
       val actualLocation = tableMetadata.storage.locationUri.get
       if (actualLocation != expectedLocation) {
         throw new Exception(
@@ -317,7 +380,7 @@ object SetWarehouseLocationTest extends Logging {
       val tableMetadata =
         catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
       val expectedLocation =
-        "file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation"
+        "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation"
       val actualLocation = tableMetadata.storage.locationUri.get
       if (actualLocation != expectedLocation) {
         throw new Exception(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message