spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf
Date Tue, 28 Jun 2016 14:54:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master e158478a9 -> 0923c4f56


[SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing
Scala SparkContext's SparkConf

## What changes were proposed in this pull request?
When we create a SparkSession at the Python side, it is possible that a SparkContext has been
created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's
SparkConf (we need to do so because conf changes on a active Python SparkContext will not
be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support
is not enabled even if enableHiveSupport is called).

## How was this patch tested?
New tests and manual tests.

Author: Yin Huai <yhuai@databricks.com>

Closes #13931 from yhuai/SPARK-16224.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0923c4f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0923c4f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0923c4f5

Branch: refs/heads/master
Commit: 0923c4f5676691e28e70ecb05890e123540b91f0
Parents: e158478
Author: Yin Huai <yhuai@databricks.com>
Authored: Tue Jun 28 07:54:44 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Tue Jun 28 07:54:44 2016 -0700

----------------------------------------------------------------------
 python/pyspark/context.py     |  2 ++
 python/pyspark/sql/session.py |  7 +++++++
 python/pyspark/sql/tests.py   | 43 +++++++++++++++++++++++++++++++++++++-
 python/pyspark/tests.py       |  8 +++++++
 4 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 7217a99..6e9f24e 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -166,6 +166,8 @@ class SparkContext(object):
 
         # Create the Java SparkContext through Py4J
         self._jsc = jsc or self._initialize_context(self._conf._jconf)
+        # Reset the SparkConf to the one actually used by the SparkContext in JVM.
+        self._conf = SparkConf(_jconf=self._jsc.sc().conf())
 
         # Create a single Accumulator in Java that we'll send all our updates through;
         # they will be passed back to us through a TCP server

http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 0c8024e..b4152a3 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -165,6 +165,13 @@ class SparkSession(object):
                     for key, value in self._options.items():
                         sparkConf.set(key, value)
                     sc = SparkContext.getOrCreate(sparkConf)
+                    # This SparkContext may be an existing one.
+                    for key, value in self._options.items():
+                        # we need to propagate the confs
+                        # before we create the SparkSession. Otherwise, confs like
+                        # warehouse path and metastore url will not be set correctly (
+                        # these confs cannot be changed once the SparkSession is created).
+                        sc._conf.set(key, value)
                     session = SparkSession(sc)
                 for key, value in self._options.items():
                     session.conf.set(key, value)

http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3f56411..f863485 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -22,6 +22,7 @@ individual modules.
 """
 import os
 import sys
+import subprocess
 import pydoc
 import shutil
 import tempfile
@@ -48,7 +49,7 @@ else:
 from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
-from pyspark.tests import ReusedPySparkTestCase
+from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
 from pyspark.sql.functions import UserDefinedFunction, sha2
 from pyspark.sql.window import Window
 from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
@@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase):
             lambda: spark.catalog.uncacheTable("does_not_exist"))
 
 
+class HiveSparkSubmitTests(SparkSubmitTests):
+
+    def test_hivecontext(self):
+        # This test checks that HiveContext is using Hive metastore (SPARK-16224).
+        # It sets a metastore url and checks if there is a derby dir created by
+        # Hive metastore. If this derby dir exists, HiveContext is using
+        # Hive metastore.
+        metastore_path = os.path.join(tempfile.mkdtemp(), "spark16224_metastore_db")
+        metastore_URL = "jdbc:derby:;databaseName=" + metastore_path + ";create=true"
+        hive_site_dir = os.path.join(self.programDir, "conf")
+        hive_site_file = self.createTempFile("hive-site.xml", ("""
+            |<configuration>
+            |  <property>
+            |  <name>javax.jdo.option.ConnectionURL</name>
+            |  <value>%s</value>
+            |  </property>
+            |</configuration>
+            """ % metastore_URL).lstrip(), "conf")
+        script = self.createTempFile("test.py", """
+            |import os
+            |
+            |from pyspark.conf import SparkConf
+            |from pyspark.context import SparkContext
+            |from pyspark.sql import HiveContext
+            |
+            |conf = SparkConf()
+            |sc = SparkContext(conf=conf)
+            |hive_context = HiveContext(sc)
+            |print(hive_context.sql("show databases").collect())
+            """)
+        proc = subprocess.Popen(
+            [self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
+             "--driver-class-path", hive_site_dir, script],
+            stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("default", out.decode('utf-8'))
+        self.assertTrue(os.path.exists(metastore_path))
+
+
 class HiveContextSQLTests(ReusedPySparkTestCase):
 
     @classmethod

http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 222c5ca..0a029b6 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1921,6 +1921,14 @@ class ContextTests(unittest.TestCase):
             post_parallalize_temp_files = os.listdir(sc._temp_dir)
             self.assertEqual(temp_files, post_parallalize_temp_files)
 
+    def test_set_conf(self):
+        # This is for an internal use case. When there is an existing SparkContext,
+        # SparkSession's builder needs to set configs into SparkContext's conf.
+        sc = SparkContext()
+        sc._conf.set("spark.test.SPARK16224", "SPARK16224")
+        self.assertEqual(sc._jsc.sc().conf().get("spark.test.SPARK16224"), "SPARK16224")
+        sc.stop()
+
     def test_stop(self):
         sc = SparkContext()
         self.assertNotEqual(SparkContext._active_spark_context, None)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message