spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-13403][SQL] Pass hadoopConfiguration to HiveConf constructors.
Date Thu, 17 Mar 2016 05:57:08 GMT
Repository: spark
Updated Branches:
  refs/heads/master de1a84e56 -> 5faba9fac


[SPARK-13403][SQL] Pass hadoopConfiguration to HiveConf constructors.

This commit updates the HiveContext so that sc.hadoopConfiguration is used to instantiate
its internal instances of HiveConf.

I tested this by overriding the S3 FileSystem implementation from spark-defaults.conf as "spark.hadoop.fs.s3.impl"
(to avoid [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810)).

Author: Ryan Blue <blue@apache.org>

Closes #11273 from rdblue/SPARK-13403-new-hive-conf-from-hadoop-conf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5faba9fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5faba9fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5faba9fa

Branch: refs/heads/master
Commit: 5faba9faccb5ce43790c43284769e0f890340606
Parents: de1a84e
Author: Ryan Blue <blue@apache.org>
Authored: Wed Mar 16 22:57:06 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Mar 16 22:57:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveContext.scala    |  6 +++++-
 .../spark/sql/hive/client/HiveClientImpl.scala     |  4 +++-
 .../sql/hive/client/IsolatedClientLoader.scala     |  8 ++++++--
 .../apache/spark/sql/hive/HiveCatalogSuite.scala   |  4 +++-
 .../spark/sql/hive/client/VersionsSuite.scala      | 17 +++++++++++++++++
 5 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5faba9fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 05fc569..4238ad1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -210,6 +210,7 @@ class HiveContext private[hive](
       version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
       sparkConf = sc.conf,
       execJars = Seq(),
+      hadoopConf = sc.hadoopConfiguration,
       config = newTemporaryConfiguration(useInMemoryDerby = true),
       isolationOn = false,
       baseClassLoader = Utils.getContextOrSparkClassLoader)
@@ -239,7 +240,7 @@ class HiveContext private[hive](
 
     // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the
options
     // into the isolated client loader
-    val metadataConf = new HiveConf()
+    val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])
 
     val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
     logInfo("default warehouse location is " + defaultWarehouseLocation)
@@ -279,6 +280,7 @@ class HiveContext private[hive](
         version = metaVersion,
         sparkConf = sc.conf,
         execJars = jars.toSeq,
+        hadoopConf = sc.hadoopConfiguration,
         config = allConfig,
         isolationOn = true,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
@@ -291,6 +293,7 @@ class HiveContext private[hive](
         hiveMetastoreVersion = hiveMetastoreVersion,
         hadoopVersion = VersionInfo.getVersion,
         sparkConf = sc.conf,
+        hadoopConf = sc.hadoopConfiguration,
         config = allConfig,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
@@ -320,6 +323,7 @@ class HiveContext private[hive](
         version = metaVersion,
         sparkConf = sc.conf,
         execJars = jars.toSeq,
+        hadoopConf = sc.hadoopConfiguration,
         config = allConfig,
         isolationOn = true,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,

http://git-wip-us.apache.org/repos/asf/spark/blob/5faba9fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 3040ec9..a5f0bbf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -22,6 +22,7 @@ import java.io.{File, PrintStream}
 import scala.collection.JavaConverters._
 import scala.language.reflectiveCalls
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.cli.CliSessionState
 import org.apache.hadoop.hive.conf.HiveConf
@@ -62,6 +63,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
 private[hive] class HiveClientImpl(
     override val version: HiveVersion,
     sparkConf: SparkConf,
+    hadoopConf: Configuration,
     config: Map[String, String],
     initClassLoader: ClassLoader,
     val clientLoader: IsolatedClientLoader)
@@ -115,7 +117,7 @@ private[hive] class HiveClientImpl(
         // so we should keep `conf` and reuse the existing instance of `CliSessionState`.
         originalState
       } else {
-        val initialConf = new HiveConf(classOf[SessionState])
+        val initialConf = new HiveConf(hadoopConf, classOf[SessionState])
         // HiveConf is a Hadoop Configuration, which has a field of classLoader and
         // the initial value will be the current thread's context class loader
         // (i.e. initClassLoader at here).

http://git-wip-us.apache.org/repos/asf/spark/blob/5faba9fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 024f4df..932402a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -26,6 +26,7 @@ import scala.language.reflectiveCalls
 import scala.util.Try
 
 import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.SparkSubmitUtils
@@ -42,6 +43,7 @@ private[hive] object IsolatedClientLoader extends Logging {
       hiveMetastoreVersion: String,
       hadoopVersion: String,
       sparkConf: SparkConf,
+      hadoopConf: Configuration,
       config: Map[String, String] = Map.empty,
       ivyPath: Option[String] = None,
       sharedPrefixes: Seq[String] = Seq.empty,
@@ -79,6 +81,7 @@ private[hive] object IsolatedClientLoader extends Logging {
       hiveVersion(hiveMetastoreVersion),
       sparkConf,
       execJars = files,
+      hadoopConf = hadoopConf,
       config = config,
       sharesHadoopClasses = sharesHadoopClasses,
       sharedPrefixes = sharedPrefixes,
@@ -149,6 +152,7 @@ private[hive] object IsolatedClientLoader extends Logging {
 private[hive] class IsolatedClientLoader(
     val version: HiveVersion,
     val sparkConf: SparkConf,
+    val hadoopConf: Configuration,
     val execJars: Seq[URL] = Seq.empty,
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
@@ -238,7 +242,7 @@ private[hive] class IsolatedClientLoader(
   /** The isolated client interface to Hive. */
   private[hive] def createClient(): HiveClient = {
     if (!isolationOn) {
-      return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
+      return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader,
this)
     }
     // Pre-reflective instantiation setup.
     logDebug("Initializing the logger to avoid disaster...")
@@ -249,7 +253,7 @@ private[hive] class IsolatedClientLoader(
       classLoader
         .loadClass(classOf[HiveClientImpl].getName)
         .getConstructors.head
-        .newInstance(version, sparkConf, config, classLoader, this)
+        .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
         .asInstanceOf[HiveClient]
     } catch {
       case e: InvocationTargetException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5faba9fa/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
index 0dc4fea..427f574 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.SparkConf
@@ -33,7 +34,8 @@ class HiveCatalogSuite extends CatalogTestCases {
     IsolatedClientLoader.forVersion(
       hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
       hadoopVersion = VersionInfo.getVersion,
-      sparkConf = new SparkConf()).createClient()
+      sparkConf = new SparkConf(),
+      hadoopConf = new Configuration()).createClient()
   }
 
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/5faba9fa/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 3d54da1..f218ab8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client
 
 import java.io.File
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
@@ -63,12 +64,26 @@ class VersionsSuite extends SparkFunSuite with Logging {
       hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
       hadoopVersion = VersionInfo.getVersion,
       sparkConf = sparkConf,
+      hadoopConf = new Configuration(),
       config = buildConf(),
       ivyPath = ivyPath).createClient()
     val db = new CatalogDatabase("default", "desc", "loc", Map())
     badClient.createDatabase(db, ignoreIfExists = true)
   }
 
+  test("hadoop configuration preserved") {
+    val hadoopConf = new Configuration();
+    hadoopConf.set("test", "success")
+    val client = IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion,
+      sparkConf = sparkConf,
+      hadoopConf = hadoopConf,
+      config = buildConf(),
+      ivyPath = ivyPath).createClient()
+    assert("success" === client.getConf("test", null))
+  }
+
   private def getNestedMessages(e: Throwable): String = {
     var causes = ""
     var lastException = e
@@ -98,6 +113,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
           hiveMetastoreVersion = "13",
           hadoopVersion = VersionInfo.getVersion,
           sparkConf = sparkConf,
+          hadoopConf = new Configuration(),
           config = buildConf(),
           ivyPath = ivyPath).createClient()
       }
@@ -118,6 +134,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
           hiveMetastoreVersion = version,
           hadoopVersion = VersionInfo.getVersion,
           sparkConf = sparkConf,
+          hadoopConf = new Configuration(),
           config = buildConf(),
           ivyPath = ivyPath).createClient()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message