spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15345][SQL][PYSPARK] SparkSession's conf doesn't take effect when this already an existing SparkContext
Date Wed, 25 May 2016 17:46:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master b120fba6a -> 01e7b9c85


[SPARK-15345][SQL][PYSPARK] SparkSession's conf doesn't take effect when this already an existing
SparkContext

## What changes were proposed in this pull request?

Override the existing SparkContext is the provided SparkConf is different. PySpark part hasn't
been fixed yet, will do that after the first round of review to ensure this is the correct
approach.

## How was this patch tested?

Manually verify it in spark-shell.

rxin  Please help review it, I think this is a very critical issue for spark 2.0

Author: Jeff Zhang <zjffdu@apache.org>

Closes #13160 from zjffdu/SPARK-15345.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01e7b9c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01e7b9c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01e7b9c8

Branch: refs/heads/master
Commit: 01e7b9c85bb84924e279021f9748774dce9702c8
Parents: b120fba
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed May 25 10:46:51 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed May 25 10:46:51 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkContext.scala    |  3 +++
 .../scala/org/apache/spark/sql/SparkSession.scala     | 14 ++++++++++++--
 .../apache/spark/sql/SparkSessionBuilderSuite.scala   | 14 +++++++++++++-
 3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01e7b9c8/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 36aa3be..5018eb3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2254,6 +2254,9 @@ object SparkContext extends Logging {
       if (activeContext.get() == null) {
         setActiveContext(new SparkContext(config), allowMultipleContexts = false)
       }
+      if (config.getAll.nonEmpty) {
+        logWarning("Use an existing SparkContext, some configuration may not take effect.")
+      }
       activeContext.get()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/01e7b9c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5c87c84..86c97b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -636,7 +636,7 @@ object SparkSession {
   /**
    * Builder for [[SparkSession]].
    */
-  class Builder {
+  class Builder extends Logging {
 
     private[this] val options = new scala.collection.mutable.HashMap[String, String]
 
@@ -753,6 +753,9 @@ object SparkSession {
       var session = activeThreadSession.get()
       if ((session ne null) && !session.sparkContext.isStopped) {
         options.foreach { case (k, v) => session.conf.set(k, v) }
+        if (options.nonEmpty) {
+          logWarning("Use an existing SparkSession, some configuration may not take effect.")
+        }
         return session
       }
 
@@ -762,6 +765,9 @@ object SparkSession {
         session = defaultSession.get()
         if ((session ne null) && !session.sparkContext.isStopped) {
           options.foreach { case (k, v) => session.conf.set(k, v) }
+          if (options.nonEmpty) {
+            logWarning("Use an existing SparkSession, some configuration may not take effect.")
+          }
           return session
         }
 
@@ -774,7 +780,11 @@ object SparkSession {
 
           val sparkConf = new SparkConf()
           options.foreach { case (k, v) => sparkConf.set(k, v) }
-          SparkContext.getOrCreate(sparkConf)
+          val sc = SparkContext.getOrCreate(sparkConf)
+          // maybe this is an existing SparkContext, update its SparkConf which maybe used
+          // by SparkSession
+          options.foreach { case (k, v) => sc.conf.set(k, v) }
+          sc
         }
         session = new SparkSession(sparkContext)
         options.foreach { case (k, v) => session.conf.set(k, v) }

http://git-wip-us.apache.org/repos/asf/spark/blob/01e7b9c8/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index ec6a2b3..786956d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 
 /**
  * Test cases for the builder pattern of [[SparkSession]].
@@ -90,4 +90,16 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
     assert(newSession != activeSession)
     newSession.stop()
   }
+
+  test("create SparkContext first then SparkSession") {
+    sparkContext.stop()
+    val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1")
+    val sparkContext2 = new SparkContext(conf)
+    val session = SparkSession.builder().config("key2", "value2").getOrCreate()
+    assert(session.conf.get("key1") == "value1")
+    assert(session.conf.get("key2") == "value2")
+    assert(session.sparkContext.conf.get("key1") == "value1")
+    assert(session.sparkContext.conf.get("key2") == "value2")
+    session.stop()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message