spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkContext in SparkSession.getOrCreate.
Date Thu, 27 Sep 2018 04:37:08 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5def10e61 -> ee214ef3a


[SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkContext in SparkSession.getOrCreate.

## What changes were proposed in this pull request?

In [SPARK-20946](https://issues.apache.org/jira/browse/SPARK-20946), we modified `SparkSession.getOrCreate`
to not update conf for existing `SparkContext` because `SparkContext` is shared by all sessions.
We should not update it in PySpark side as well.

## How was this patch tested?

Added tests.

Closes #22545 from ueshin/issues/SPARK-25525/not_update_existing_conf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee214ef3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee214ef3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee214ef3

Branch: refs/heads/master
Commit: ee214ef3a0ec36c4aae5040778d41c376df3da19
Parents: 5def10e
Author: Takuya UESHIN <ueshin@databricks.com>
Authored: Thu Sep 27 12:37:03 2018 +0800
Committer: hyukjinkwon <gurwls223@apache.org>
Committed: Thu Sep 27 12:37:03 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/session.py | 14 ++++--------
 python/pyspark/sql/tests.py   | 46 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee214ef3/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index a5e2872..079af8c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -156,7 +156,7 @@ class SparkSession(object):
             default.
 
             >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
-            >>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
+            >>> s1.conf.get("k1") == "v1"
             True
 
             In case an existing SparkSession is returned, the config options specified
@@ -179,19 +179,13 @@ class SparkSession(object):
                         sparkConf = SparkConf()
                         for key, value in self._options.items():
                             sparkConf.set(key, value)
-                        sc = SparkContext.getOrCreate(sparkConf)
                         # This SparkContext may be an existing one.
-                    for key, value in self._options.items():
-                        # we need to propagate the confs
-                        # before we create the SparkSession. Otherwise, confs like
-                        # warehouse path and metastore url will not be set correctly (
-                        # these confs cannot be changed once the SparkSession is created).
-                        sc._conf.set(key, value)
+                        sc = SparkContext.getOrCreate(sparkConf)
+                    # Do not update `SparkConf` for existing `SparkContext`, as it's shared
+                    # by all sessions.
                     session = SparkSession(sc)
                 for key, value in self._options.items():
                     session._jsparkSession.sessionState().conf().setConfString(key, value)
-                for key, value in self._options.items():
-                    session.sparkContext._conf.set(key, value)
                 return session
 
     builder = Builder()

http://git-wip-us.apache.org/repos/asf/spark/blob/ee214ef3/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 74642d4..64a7ceb 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -80,7 +80,7 @@ _have_pandas = _pandas_requirement_message is None
 _have_pyarrow = _pyarrow_requirement_message is None
 _test_compiled = _test_not_compiled_message is None
 
-from pyspark import SparkContext
+from pyspark import SparkConf, SparkContext
 from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
@@ -283,6 +283,50 @@ class DataTypeTests(unittest.TestCase):
         self.assertRaises(ValueError, lambda: row_class(1, 2, 3))
 
 
+class SparkSessionBuilderTests(unittest.TestCase):
+
+    def test_create_spark_context_first_then_spark_session(self):
+        sc = None
+        session = None
+        try:
+            conf = SparkConf().set("key1", "value1")
+            sc = SparkContext('local[4]', "SessionBuilderTests", conf=conf)
+            session = SparkSession.builder.config("key2", "value2").getOrCreate()
+
+            self.assertEqual(session.conf.get("key1"), "value1")
+            self.assertEqual(session.conf.get("key2"), "value2")
+            self.assertEqual(session.sparkContext, sc)
+
+            self.assertFalse(sc.getConf().contains("key2"))
+            self.assertEqual(sc.getConf().get("key1"), "value1")
+        finally:
+            if session is not None:
+                session.stop()
+            if sc is not None:
+                sc.stop()
+
+    def test_another_spark_session(self):
+        session1 = None
+        session2 = None
+        try:
+            session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
+            session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
+
+            self.assertEqual(session1.conf.get("key1"), "value1")
+            self.assertEqual(session2.conf.get("key1"), "value1")
+            self.assertEqual(session1.conf.get("key2"), "value2")
+            self.assertEqual(session2.conf.get("key2"), "value2")
+            self.assertEqual(session1.sparkContext, session2.sparkContext)
+
+            self.assertEqual(session1.sparkContext.getConf().get("key1"), "value1")
+            self.assertFalse(session1.sparkContext.getConf().contains("key2"))
+        finally:
+            if session1 is not None:
+                session1.stop()
+            if session2 is not None:
+                session2.stop()
+
+
 class SQLTests(ReusedSQLTestCase):
 
     @classmethod


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message