Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3F388200B13 for ; Wed, 15 Jun 2016 20:50:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3DAE5160A4D; Wed, 15 Jun 2016 18:50:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 37DA7160A19 for ; Wed, 15 Jun 2016 20:50:58 +0200 (CEST) Received: (qmail 19775 invoked by uid 500); 15 Jun 2016 18:50:57 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 19765 invoked by uid 99); 15 Jun 2016 18:50:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jun 2016 18:50:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0D960DFF13; Wed, 15 Jun 2016 18:50:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: <9f99ffea056b4ae4bb0419fe0d5b968f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15959][SQL] Add the support of hive.metastore.warehouse.dir back Date: Wed, 15 Jun 2016 18:50:57 +0000 (UTC) archived-at: Wed, 15 Jun 2016 18:50:59 -0000 Repository: spark Updated Branches: refs/heads/master 9a5071996 -> e1585cc74 [SPARK-15959][SQL] Add the support of hive.metastore.warehouse.dir back ## What changes were proposed in this pull request? This PR adds the support of conf `hive.metastore.warehouse.dir` back. With this patch, the way of setting the warehouse dir is described as follows: * If `spark.sql.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically set to the value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the value of `spark.sql.warehouse.dir`. * If `spark.sql.warehouse.dir` is not set but `hive.metastore.warehouse.dir` is set, `spark.sql.warehouse.dir` will be automatically set to the value of `hive.metastore.warehouse.dir`. The warehouse dir is effectively set to the value of `hive.metastore.warehouse.dir`. * If neither `spark.sql.warehouse.dir` nor `hive.metastore.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically set to the default value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the default value of `spark.sql.warehouse.dir`. ## How was this patch tested? `set hive.metastore.warehouse.dir` in `HiveSparkSubmitSuite`. JIRA: https://issues.apache.org/jira/browse/SPARK-15959 Author: Yin Huai Closes #13679 from yhuai/hiveWarehouseDir. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1585cc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1585cc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1585cc7 Branch: refs/heads/master Commit: e1585cc74853c497271eecdc943c0eabe1aeb4c1 Parents: 9a50719 Author: Yin Huai Authored: Wed Jun 15 11:50:54 2016 -0700 Committer: Reynold Xin Committed: Wed Jun 15 11:50:54 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/internal/SharedState.scala | 29 ++++++- .../apache/spark/sql/hive/HiveSharedState.scala | 13 +-- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 91 +++++++++++++++++--- 3 files changed, 106 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e1585cc7/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index c37f7f1..bc349b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} import org.apache.spark.sql.execution.CacheManager @@ -30,7 +31,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A class that holds all state shared across sessions in a given [[SQLContext]]. */ -private[sql] class SharedState(val sparkContext: SparkContext) { +private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { /** * Class for caching query results reused in future executions. @@ -46,7 +47,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) { * The base hadoop configuration which is shared among all spark sessions. It is based on the * default hadoop configuration of Spark, with custom configurations inside `hive-site.xml`. */ - lazy val hadoopConf: Configuration = { + val hadoopConf: Configuration = { val conf = new Configuration(sparkContext.hadoopConfiguration) val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { @@ -66,6 +67,30 @@ private[sql] class SharedState(val sparkContext: SparkContext) { val jarClassLoader = new NonClosableMutableURLClassLoader( org.apache.spark.util.Utils.getContextOrSparkClassLoader) + { + // Set the Hive metastore warehouse path to the one we use + val tempConf = new SQLConf + sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } + val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir") + if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) { + // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, + // we will respect the value of hive.metastore.warehouse.dir. + tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) + sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) + logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + + s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " + + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") + } else { + // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using + // the value of spark.sql.warehouse.dir. + // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, + // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. + sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath) + } + + logInfo(s"Warehouse path is '${tempConf.warehousePath}'.") + } + /** * Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI. */ http://git-wip-us.apache.org/repos/asf/spark/blob/e1585cc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index 78b1ecb..6b7a333 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -18,9 +18,8 @@ package org.apache.spark.sql.hive import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SharedState, SQLConf} +import org.apache.spark.sql.internal.SharedState /** @@ -28,18 +27,10 @@ import org.apache.spark.sql.internal.{SharedState, SQLConf} * [[org.apache.spark.sql.SparkSession]] backed by Hive. */ private[hive] class HiveSharedState(override val sparkContext: SparkContext) - extends SharedState(sparkContext) with Logging { + extends SharedState(sparkContext) { // TODO: just share the IsolatedClientLoader instead of the client instance itself - { - // Set the Hive metastore warehouse path to the one we use - val tempConf = new SQLConf - sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } - sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath) - logInfo(s"Setting Hive metastore warehouse path to '${tempConf.warehousePath}'") - } - /** * A Hive client used to interact with the metastore. */ http://git-wip-us.apache.org/repos/asf/spark/blob/e1585cc7/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index d56bede..9bca720 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import java.io.File +import java.io.{BufferedWriter, File, FileWriter} import java.sql.Timestamp import java.util.Date @@ -205,7 +205,7 @@ class HiveSparkSubmitSuite val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val args = Seq( "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"), - "--name", "SetWarehouseLocationTest", + "--name", "SetSparkWarehouseLocationTest", "--master", "local-cluster[2,1,1024]", "--conf", "spark.ui.enabled=false", "--conf", "spark.master.rest.enabled=false", @@ -214,6 +214,45 @@ class HiveSparkSubmitSuite runSparkSubmit(args) } + test("set hive.metastore.warehouse.dir") { + // In this test, we set hive.metastore.warehouse.dir in hive-site.xml but + // not set spark.sql.warehouse.dir. So, the warehouse dir should be + // the value of hive.metastore.warehouse.dir. Also, the value of + // spark.sql.warehouse.dir should be set to the value of hive.metastore.warehouse.dir. + + val hiveWarehouseLocation = Utils.createTempDir() + hiveWarehouseLocation.delete() + val hiveSiteXmlContent = + s""" + | + | + | hive.metastore.warehouse.dir + | $hiveWarehouseLocation + | + | + """.stripMargin + + // Write a hive-site.xml containing a setting of hive.metastore.warehouse.dir. + val hiveSiteDir = Utils.createTempDir() + val file = new File(hiveSiteDir.getCanonicalPath, "hive-site.xml") + val bw = new BufferedWriter(new FileWriter(file)) + bw.write(hiveSiteXmlContent) + bw.close() + + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"), + "--name", "SetHiveWarehouseLocationTest", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.test.expectedWarehouseDir=$hiveWarehouseLocation", + "--conf", s"spark.driver.extraClassPath=${hiveSiteDir.getCanonicalPath}", + "--driver-java-options", "-Dderby.system.durability=test", + unusedJar.toString) + runSparkSubmit(args) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -277,19 +316,43 @@ class HiveSparkSubmitSuite object SetWarehouseLocationTest extends Logging { def main(args: Array[String]): Unit = { Utils.configTestLog4j("INFO") - val warehouseLocation = Utils.createTempDir() - warehouseLocation.delete() - val hiveWarehouseLocation = Utils.createTempDir() - hiveWarehouseLocation.delete() - // We will use the value of spark.sql.warehouse.dir override the - // value of hive.metastore.warehouse.dir. - val sparkSession = SparkSession.builder() + val sparkConf = new SparkConf(loadDefaults = true) + val builder = SparkSession.builder() + .config(sparkConf) .config("spark.ui.enabled", "false") - .config("spark.sql.warehouse.dir", warehouseLocation.toString) - .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) .enableHiveSupport() - .getOrCreate() + val providedExpectedWarehouseLocation = + sparkConf.getOption("spark.sql.test.expectedWarehouseDir") + + val (sparkSession, expectedWarehouseLocation) = providedExpectedWarehouseLocation match { + case Some(warehouseDir) => + // If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set + // through spark-summit. So, neither spark.sql.warehouse.dir nor + // hive.metastore.warehouse.dir is set at here. + (builder.getOrCreate(), warehouseDir) + case None => + val warehouseLocation = Utils.createTempDir() + warehouseLocation.delete() + val hiveWarehouseLocation = Utils.createTempDir() + hiveWarehouseLocation.delete() + // If spark.sql.test.expectedWarehouseDir is not set, we will set + // spark.sql.warehouse.dir and hive.metastore.warehouse.dir. + // We are expecting that the value of spark.sql.warehouse.dir will override the + // value of hive.metastore.warehouse.dir. + val session = builder + .config("spark.sql.warehouse.dir", warehouseLocation.toString) + .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) + .getOrCreate() + (session, warehouseLocation.toString) + + } + + if (sparkSession.conf.get("spark.sql.warehouse.dir") != expectedWarehouseLocation) { + throw new Exception( + "spark.sql.warehouse.dir is not set to the expected warehouse location " + + s"$expectedWarehouseLocation.") + } val catalog = sparkSession.sessionState.catalog @@ -301,7 +364,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("default"))) val expectedLocation = - "file:" + warehouseLocation.toString + "/testlocation" + "file:" + expectedWarehouseLocation.toString + "/testlocation" val actualLocation = tableMetadata.storage.locationUri.get if (actualLocation != expectedLocation) { throw new Exception( @@ -317,7 +380,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB"))) val expectedLocation = - "file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation" + "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation" val actualLocation = tableMetadata.storage.locationUri.get if (actualLocation != expectedLocation) { throw new Exception( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org