spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse location
Date Sun, 01 May 2016 01:05:03 GMT
Repository: spark
Updated Branches:
  refs/heads/master 19a6d192d -> 0182d9599


[SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse
location

This PR contains three changes:
1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir.
2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table
in SparkSession without hive support, the default db's path will be an empty string.
3. When we create a database, we need to make the path qualified.

Existing tests and new tests

Author: Yin Huai <yhuai@databricks.com>

Closes #12812 from yhuai/warehouse.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0182d959
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0182d959
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0182d959

Branch: refs/heads/master
Commit: 0182d9599d15f70eeb6288bf9294fa677004bd14
Parents: 19a6d19
Author: Yin Huai <yhuai@databricks.com>
Authored: Sat Apr 30 18:04:36 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sat Apr 30 18:04:42 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  35 ++++++-
 .../org/apache/spark/sql/SparkSession.scala     |  18 +++-
 .../spark/sql/internal/SessionState.scala       |   3 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 103 +++++++++++++++++--
 .../spark/sql/hive/HiveSessionCatalog.scala     |  11 +-
 .../spark/sql/hive/HiveSessionState.scala       |   3 +-
 .../org/apache/spark/sql/hive/HiveUtils.scala   |   2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |   2 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   8 ++
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  73 ++++++++++++-
 10 files changed, 236 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b06f24b..a445a25 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import scala.collection.mutable
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -44,14 +45,21 @@ class SessionCatalog(
     externalCatalog: ExternalCatalog,
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
-    conf: CatalystConf) extends Logging {
+    conf: CatalystConf,
+    hadoopConf: Configuration) extends Logging {
   import CatalogTypes.TablePartitionSpec
 
+  // For testing only.
   def this(
       externalCatalog: ExternalCatalog,
       functionRegistry: FunctionRegistry,
       conf: CatalystConf) {
-    this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
+    this(
+      externalCatalog,
+      DummyFunctionResourceLoader,
+      functionRegistry,
+      conf,
+      new Configuration())
   }
 
   // For testing only.
@@ -68,7 +76,8 @@ class SessionCatalog(
   // the corresponding item in the current database.
   protected var currentDb = {
     val defaultName = "default"
-    val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
+    val defaultDbDefinition =
+      CatalogDatabase(defaultName, "default database", conf.warehousePath, Map())
     // Initialize default database if it doesn't already exist
     createDatabase(defaultDbDefinition, ignoreIfExists = true)
     defaultName
@@ -81,6 +90,18 @@ class SessionCatalog(
     if (conf.caseSensitiveAnalysis) name else name.toLowerCase
   }
 
+  /**
+   * This method is used to make the given path qualified before we
+   * store this path in the underlying external catalog. So, when a path
+   * does not contain a scheme, this path will not be changed after the default
+   * FileSystem is changed.
+   */
+  private def makeQualifiedPath(path: String): Path = {
+    val hadoopPath = new Path(path)
+    val fs = hadoopPath.getFileSystem(hadoopConf)
+    fs.makeQualified(hadoopPath)
+  }
+
   // ----------------------------------------------------------------------------
   // Databases
   // ----------------------------------------------------------------------------
@@ -88,7 +109,10 @@ class SessionCatalog(
   // ----------------------------------------------------------------------------
 
   def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
-    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+    externalCatalog.createDatabase(
+      dbDefinition.copy(locationUri = qualifiedPath),
+      ignoreIfExists)
   }
 
   def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
@@ -125,7 +149,8 @@ class SessionCatalog(
   }
 
   def getDefaultDBPath(db: String): String = {
-    new Path(new Path(conf.warehousePath), db + ".db").toString
+    val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
+    new Path(new Path(conf.warehousePath), database + ".db").toString
   }
 
   // ----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7d3ff9e..4c2a7b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalog.Catalog
@@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, LongType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -53,7 +54,7 @@ import org.apache.spark.util.Utils
 class SparkSession private(
     @transient val sparkContext: SparkContext,
     @transient private val existingSharedState: Option[SharedState])
-  extends Serializable { self =>
+  extends Serializable with Logging { self =>
 
   def this(sc: SparkContext) {
     this(sc, None)
@@ -64,6 +65,19 @@ class SparkSession private(
    |  Session-related state  |
    * ----------------------- */
 
+  {
+    val defaultWarehousePath =
+      SQLConf.WAREHOUSE_PATH
+        .defaultValueString
+        .replace("${system:user.dir}", System.getProperty("user.dir"))
+    val warehousePath = sparkContext.conf.get(
+      SQLConf.WAREHOUSE_PATH.key,
+      defaultWarehousePath)
+    sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, warehousePath)
+    sparkContext.conf.set("hive.metastore.warehouse.dir", warehousePath)
+    logInfo(s"Setting warehouse location to $warehousePath")
+  }
+
   /**
    * State shared across sessions, including the [[SparkContext]], cached data, listener,
    * and a catalog that interacts with external systems.

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 6fa044a..ebff756 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -99,7 +99,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
     sparkSession.externalCatalog,
     functionResourceLoader,
     functionRegistry,
-    conf)
+    conf,
+    newHadoopConf())
 
   /**
    * Interface exposed to the user for registering user-defined functions.

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4162329..12acb9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import java.io.File
 
+import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
@@ -64,15 +65,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   }
 
   private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
-    catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
+    catalog.createDatabase(
+      CatalogDatabase(name, "", sqlContext.conf.warehousePath, Map()), ignoreIfExists = false)
   }
 
   private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
+    val storage =
+      CatalogStorageFormat(
+        locationUri = Some(catalog.defaultTablePath(name)),
+        inputFormat = None,
+        outputFormat = None,
+        serde = None,
+        serdeProperties = Map())
     catalog.createTable(CatalogTable(
       identifier = name,
       tableType = CatalogTableType.EXTERNAL,
-      storage = CatalogStorageFormat(None, None, None, None, Map()),
-      schema = Seq()), ignoreIfExists = false)
+      storage = storage,
+      schema = Seq(),
+      createTime = 0L), ignoreIfExists = false)
   }
 
   private def createTablePartition(
@@ -83,6 +93,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
   }
 
+  test("the qualified path of a database is stored in the catalog") {
+    val catalog = sqlContext.sessionState.catalog
+
+    val path = System.getProperty("java.io.tmpdir")
+    // The generated temp path is not qualified.
+    assert(!path.startsWith("file:/"))
+    sql(s"CREATE DATABASE db1 LOCATION '$path'")
+    val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
+    assert("file" === pathInCatalog.getScheme)
+    assert(path === pathInCatalog.getPath)
+
+    withSQLConf(
+      SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) {
+      sql(s"CREATE DATABASE db2")
+      val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+      assert("file" === pathInCatalog.getScheme)
+      assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath)
+    }
+
+    sql("DROP DATABASE db1")
+    sql("DROP DATABASE db2")
+  }
+
   test("Create/Drop Database") {
     withSQLConf(
         SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator))
{
@@ -96,10 +129,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
 
           sql(s"CREATE DATABASE $dbName")
           val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+          val expectedLocation =
+            "file:" + System.getProperty("java.io.tmpdir") +
+              File.separator + s"$dbNameWithoutBackTicks.db"
           assert(db1 == CatalogDatabase(
             dbNameWithoutBackTicks,
             "",
-            System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+            expectedLocation,
             Map.empty))
           sql(s"DROP DATABASE $dbName CASCADE")
           assert(!catalog.databaseExists(dbNameWithoutBackTicks))
@@ -121,10 +157,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
           val dbNameWithoutBackTicks = cleanIdentifier(dbName)
           sql(s"CREATE DATABASE $dbName")
           val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+          val expectedLocation =
+            "file:" + System.getProperty("java.io.tmpdir") +
+              File.separator + s"$dbNameWithoutBackTicks.db"
           assert(db1 == CatalogDatabase(
             dbNameWithoutBackTicks,
             "",
-            System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+            expectedLocation,
             Map.empty))
 
           val message = intercept[AnalysisException] {
@@ -148,7 +187,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
         try {
           val dbNameWithoutBackTicks = cleanIdentifier(dbName)
           val location =
-            System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
+            "file:" + System.getProperty("java.io.tmpdir") +
+              File.separator + s"$dbNameWithoutBackTicks.db"
+
           sql(s"CREATE DATABASE $dbName")
 
           checkAnswer(
@@ -210,6 +251,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
 
   // TODO: test drop database in restrict mode
 
+  test("create table in default db") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent1 = TableIdentifier("tab1", None)
+    createTable(catalog, tableIdent1)
+    val expectedTableIdent = tableIdent1.copy(database = Some("default"))
+    val expectedLocation =
+      catalog.getDatabaseMetadata("default").locationUri + "/tab1"
+    val expectedStorage =
+      CatalogStorageFormat(
+        locationUri = Some(expectedLocation),
+        inputFormat = None,
+        outputFormat = None,
+        serde = None,
+        serdeProperties = Map())
+    val expectedTable =
+      CatalogTable(
+        identifier = expectedTableIdent,
+        tableType = CatalogTableType.EXTERNAL,
+        storage = expectedStorage,
+        schema = Seq(),
+        createTime = 0L)
+    assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+  }
+
+  test("create table in a specific db") {
+    val catalog = sqlContext.sessionState.catalog
+    createDatabase(catalog, "dbx")
+    val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
+    createTable(catalog, tableIdent1)
+    val expectedLocation =
+      catalog.getDatabaseMetadata("dbx").locationUri + "/tab1"
+    val expectedStorage =
+      CatalogStorageFormat(
+        locationUri = Some(expectedLocation),
+        inputFormat = None,
+        outputFormat = None,
+        serde = None,
+        serdeProperties = Map())
+    val expectedTable =
+      CatalogTable(
+        identifier = tableIdent1,
+        tableType = CatalogTableType.EXTERNAL,
+        storage = expectedStorage,
+        schema = Seq(),
+        createTime = 0L)
+    assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+  }
+
   test("alter table: rename") {
     val catalog = sqlContext.sessionState.catalog
     val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
@@ -534,7 +623,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     if (isDatasourceTable) {
       convertToDatasourceTable(catalog, tableIdent)
     }
-    assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
     assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
     assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
     assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 456587e..f023edb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.util.{Failure, Success, Try}
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
 import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
 import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
@@ -44,8 +45,14 @@ private[sql] class HiveSessionCatalog(
     sparkSession: SparkSession,
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
-    conf: SQLConf)
-  extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf)
{
+    conf: SQLConf,
+    hadoopConf: Configuration)
+  extends SessionCatalog(
+    externalCatalog,
+    functionResourceLoader,
+    functionRegistry,
+    conf,
+    hadoopConf) {
 
   override def setCurrentDatabase(db: String): Unit = {
     super.setCurrentDatabase(db)

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 57aa4b2..31f28f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -54,7 +54,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       sparkSession,
       functionResourceLoader,
       functionRegistry,
-      conf)
+      conf,
+      newHadoopConf())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index be89edb..d033b05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -379,7 +379,7 @@ private[spark] object HiveUtils extends Logging {
         propMap.put(confvar.varname, confvar.getDefaultExpr())
       }
     }
-    propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, localMetastore.toURI.toString)
+    propMap.put(SQLConf.WAREHOUSE_PATH.key, localMetastore.toURI.toString)
     propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
       s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true")
     propMap.put("datanucleus.rdbms.datastoreAdapterClassName",

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 78ba2bf..cdfadfa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -179,7 +179,7 @@ private[hive] class HiveClientImpl(
 
   // Log the default warehouse location.
   logInfo(
-    s"Default warehouse location for Hive client " +
+    s"Warehouse location for Hive client " +
       s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
 
   /** Returns the configuration for the current session. */

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b41d882..42746ec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -115,6 +115,12 @@ private[hive] class TestHiveSparkSession(
     @transient private val existingSharedState: Option[TestHiveSharedState])
   extends SparkSession(sc) with Logging { self =>
 
+  // TODO: We need to set the temp warehouse path to sc's conf.
+  // Right now, In SparkSession, we will set the warehouse path to the default one
+  // instead of the temp one. Then, we override the setting in TestHiveSharedState
+  // when we creating metadataHive. This flow is not easy to follow and can introduce
+  // confusion when a developer is debugging an issue. We need to refactor this part
+  // to just set the temp warehouse path in sc's conf.
   def this(sc: SparkContext) {
     this(
       sc,
@@ -573,6 +579,8 @@ private[hive] object TestHiveContext {
       scratchDirPath: File,
       metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
     HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map(
+      // Override WAREHOUSE_PATH and METASTOREWAREHOUSE to use the given path.
+      SQLConf.WAREHOUSE_PATH.key -> warehousePath.toURI.toString,
       ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
       ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
       ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/0182d959/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index cc05e1d..77a6a94 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -31,9 +31,9 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction
-import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -202,6 +202,19 @@ class HiveSparkSubmitSuite
     runSparkSubmit(args)
   }
 
+  test("set spark.sql.warehouse.dir") {
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val args = Seq(
+      "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
+      "--name", "SetWarehouseLocationTest",
+      "--master", "local-cluster[2,1,1024]",
+      "--conf", "spark.ui.enabled=false",
+      "--conf", "spark.master.rest.enabled=false",
+      "--driver-java-options", "-Dderby.system.durability=test",
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
   // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -262,6 +275,62 @@ class HiveSparkSubmitSuite
   }
 }
 
+object SetWarehouseLocationTest extends Logging {
+  def main(args: Array[String]): Unit = {
+    Utils.configTestLog4j("INFO")
+    val warehouseLocation = Utils.createTempDir()
+    warehouseLocation.delete()
+    val hiveWarehouseLocation = Utils.createTempDir()
+    hiveWarehouseLocation.delete()
+
+    val conf = new SparkConf()
+    conf.set("spark.ui.enabled", "false")
+    // We will use the value of spark.sql.warehouse.dir override the
+    // value of hive.metastore.warehouse.dir.
+    conf.set("spark.sql.warehouse.dir", warehouseLocation.toString)
+    conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
+
+    val sc = new SparkContext(conf)
+    val sparkSession = SparkSession.withHiveSupport(sc)
+    val catalog = sparkSession.sessionState.catalog
+
+    sparkSession.sql("drop table if exists testLocation")
+    sparkSession.sql("drop database if exists testLocationDB cascade")
+
+    {
+      sparkSession.sql("create table testLocation (a int)")
+      val tableMetadata =
+        catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
+      val expectedLocation =
+        "file:" + warehouseLocation.toString + "/testlocation"
+      val actualLocation = tableMetadata.storage.locationUri.get
+      if (actualLocation != expectedLocation) {
+        throw new Exception(
+          s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
+      }
+      sparkSession.sql("drop table testLocation")
+    }
+
+    {
+      sparkSession.sql("create database testLocationDB")
+      sparkSession.sql("use testLocationDB")
+      sparkSession.sql("create table testLocation (a int)")
+      val tableMetadata =
+        catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
+      val expectedLocation =
+        "file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation"
+      val actualLocation = tableMetadata.storage.locationUri.get
+      if (actualLocation != expectedLocation) {
+        throw new Exception(
+          s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
+      }
+      sparkSession.sql("drop table testLocation")
+      sparkSession.sql("use default")
+      sparkSession.sql("drop database testLocationDB")
+    }
+  }
+}
+
 // This application is used to test defining a new Hive UDF (with an associated jar)
 // and use this UDF. We need to run this test in separate JVM to make sure we
 // can load the jar defined with the function.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message