spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-15012][SQL] Simplify configuration API further
Date Sat, 30 Apr 2016 03:46:12 GMT
Repository: spark
Updated Branches:
  refs/heads/master b056e8cb0 -> 66773eb8a


[SPARK-15012][SQL] Simplify configuration API further

## What changes were proposed in this pull request?

1. Remove all the `spark.setConf` etc. Just expose `spark.conf`
2. Make `spark.conf` take in things set in the core `SparkConf` as well, otherwise users may
get confused

This was done for both the Python and Scala APIs.

## How was this patch tested?
`SQLConfSuite`, python tests.

This one fixes the failed tests in #12787

Closes #12787

Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12798 from yhuai/conf-api.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66773eb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66773eb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66773eb8

Branch: refs/heads/master
Commit: 66773eb8a55bfe6437dd4096c2c55685aca29dcd
Parents: b056e8c
Author: Andrew Or <andrew@databricks.com>
Authored: Fri Apr 29 20:46:07 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Fri Apr 29 20:46:07 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/context.py                   |  4 +-
 python/pyspark/sql/session.py                   | 29 ------
 python/pyspark/sql/tests.py                     |  4 +-
 .../spark/sql/ContinuousQueryManager.scala      |  2 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../spark/sql/RelationalGroupedDataset.scala    |  2 +-
 .../org/apache/spark/sql/RuntimeConfig.scala    | 26 ++++++
 .../scala/org/apache/spark/sql/SQLContext.scala | 28 ++++--
 .../org/apache/spark/sql/SparkSession.scala     | 95 +++-----------------
 .../sql/execution/command/SetCommand.scala      | 11 +--
 .../InsertIntoHadoopFsRelation.scala            |  2 +-
 .../datasources/parquet/ParquetRelation.scala   | 30 +++----
 .../execution/streaming/FileStreamSinkLog.scala |  6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  3 -
 .../spark/sql/internal/SessionState.scala       | 19 +---
 .../org/apache/spark/sql/SQLContextSuite.scala  |  6 --
 .../spark/sql/internal/SQLConfSuite.scala       | 16 +++-
 .../spark/sql/hive/HiveSessionState.scala       |  8 +-
 18 files changed, 108 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 417d719..2096236 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -114,7 +114,7 @@ class SQLContext(object):
     def setConf(self, key, value):
         """Sets the given Spark SQL configuration property.
         """
-        self.sparkSession.setConf(key, value)
+        self.sparkSession.conf.set(key, value)
 
     @ignore_unicode_prefix
     @since(1.3)
@@ -133,7 +133,7 @@ class SQLContext(object):
         >>> sqlContext.getConf("spark.sql.shuffle.partitions", u"10")
         u'50'
         """
-        return self.sparkSession.getConf(key, defaultValue)
+        return self.sparkSession.conf.get(key, defaultValue)
 
     @property
     @since("1.3.1")

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index c245261..35c36b4 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -134,35 +134,6 @@ class SparkSession(object):
             self._conf = RuntimeConfig(self._jsparkSession.conf())
         return self._conf
 
-    @since(2.0)
-    def setConf(self, key, value):
-        """
-        Sets the given Spark SQL configuration property.
-        """
-        self._jsparkSession.setConf(key, value)
-
-    @ignore_unicode_prefix
-    @since(2.0)
-    def getConf(self, key, defaultValue=None):
-        """Returns the value of Spark SQL configuration property for the given key.
-
-        If the key is not set and defaultValue is not None, return
-        defaultValue. If the key is not set and defaultValue is None, return
-        the system default value.
-
-        >>> spark.getConf("spark.sql.shuffle.partitions")
-        u'200'
-        >>> spark.getConf("spark.sql.shuffle.partitions", "10")
-        u'10'
-        >>> spark.setConf("spark.sql.shuffle.partitions", "50")
-        >>> spark.getConf("spark.sql.shuffle.partitions", "10")
-        u'50'
-        """
-        if defaultValue is not None:
-            return self._jsparkSession.getConf(key, defaultValue)
-        else:
-            return self._jsparkSession.getConf(key)
-
     @property
     @since(2.0)
     def catalog(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index ea98206..4995b26 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1397,9 +1397,9 @@ class SQLTests(ReusedPySparkTestCase):
 
     def test_conf(self):
         spark = self.sparkSession
-        spark.setConf("bogo", "sipeo")
+        spark.conf.set("bogo", "sipeo")
         self.assertEqual(self.sparkSession.conf.get("bogo"), "sipeo")
-        spark.setConf("bogo", "ta")
+        spark.conf.set("bogo", "ta")
         self.assertEqual(spark.conf.get("bogo"), "ta")
         self.assertEqual(spark.conf.get("bogo", "not.read"), "ta")
         self.assertEqual(spark.conf.get("not.set", "ta"), "ta")

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 9e2e2d0..f82130c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -184,7 +184,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
       val analyzedPlan = df.queryExecution.analyzed
       df.queryExecution.assertAnalyzed()
 
-      if (sparkSession.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
         UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6a600c1..28f5ccd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -284,9 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         new Path(userSpecified).toUri.toString
       }.orElse {
         val checkpointConfig: Option[String] =
-          df.sparkSession.getConf(
-            SQLConf.CHECKPOINT_LOCATION,
-            None)
+          df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None)
 
         checkpointConfig.map { location =>
           new Path(location, queryName).toUri.toString

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 7ee9732..4f5bf63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -302,7 +302,7 @@ class RelationalGroupedDataset protected[sql](
    */
   def pivot(pivotColumn: String): RelationalGroupedDataset = {
     // This is to prevent unintended OOM errors when the number of distinct values is large
-    val maxValues = df.sparkSession.getConf(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
+    val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
     // Get the distinct values of the column and sort them so its consistent
     val values = df.select(pivotColumn)
       .distinct()

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index f2e8515..670288b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql.internal.SQLConf
 
+
 /**
  * Runtime configuration interface for Spark. To access this, use [[SparkSession.conf]].
  *
@@ -78,6 +80,30 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
 
   /**
    * Returns the value of Spark runtime configuration property for the given key.
+   */
+  @throws[NoSuchElementException]("if the key is not set")
+  protected[sql] def get[T](entry: ConfigEntry[T]): T = {
+    sqlConf.getConf(entry)
+  }
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given key.
+   */
+  protected[sql] def get[T](entry: ConfigEntry[T], default: T): T = {
+    sqlConf.getConf(entry, default)
+  }
+
+  /**
+   * Returns all properties set in this conf.
+   *
+   * @since 2.0.0
+   */
+  def getAll: Map[String, String] = {
+    sqlConf.getAllConfs
+  }
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given key.
    *
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6dfac3d..ff633cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -134,13 +134,15 @@ class SQLContext private[sql](
    * @group config
    * @since 1.0.0
    */
-  def setConf(props: Properties): Unit = sparkSession.setConf(props)
+  def setConf(props: Properties): Unit = {
+    sessionState.conf.setConf(props)
+  }
 
   /**
    * Set the given Spark SQL configuration property.
    */
   private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
-    sparkSession.setConf(entry, value)
+    sessionState.conf.setConf(entry, value)
   }
 
   /**
@@ -149,7 +151,9 @@ class SQLContext private[sql](
    * @group config
    * @since 1.0.0
    */
-  def setConf(key: String, value: String): Unit = sparkSession.setConf(key, value)
+  def setConf(key: String, value: String): Unit = {
+    sparkSession.conf.set(key, value)
+  }
 
   /**
    * Return the value of Spark SQL configuration property for the given key.
@@ -157,13 +161,17 @@ class SQLContext private[sql](
    * @group config
    * @since 1.0.0
    */
-  def getConf(key: String): String = sparkSession.getConf(key)
+  def getConf(key: String): String = {
+    sparkSession.conf.get(key)
+  }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is
not set
    * yet, return `defaultValue` in [[ConfigEntry]].
    */
-  private[sql] def getConf[T](entry: ConfigEntry[T]): T = sparkSession.getConf(entry)
+  private[sql] def getConf[T](entry: ConfigEntry[T]): T = {
+    sparkSession.conf.get(entry)
+  }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is
not set
@@ -171,7 +179,7 @@ class SQLContext private[sql](
    * desired one.
    */
   private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
-    sparkSession.getConf(entry, defaultValue)
+    sparkSession.conf.get(entry, defaultValue)
   }
 
   /**
@@ -181,7 +189,9 @@ class SQLContext private[sql](
    * @group config
    * @since 1.0.0
    */
-  def getConf(key: String, defaultValue: String): String = sparkSession.getConf(key, defaultValue)
+  def getConf(key: String, defaultValue: String): String = {
+    sparkSession.conf.get(key, defaultValue)
+  }
 
   /**
    * Return all the configuration properties that have been set (i.e. not the default).
@@ -190,7 +200,9 @@ class SQLContext private[sql](
    * @group config
    * @since 1.0.0
    */
-  def getAllConfs: immutable.Map[String, String] = sparkSession.getAllConfs
+  def getAllConfs: immutable.Map[String, String] = {
+    sparkSession.conf.getAll
+  }
 
   protected[sql] def parseSql(sql: String): LogicalPlan = sparkSession.parseSql(sql)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 11c0aaa..7d3ff9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -109,6 +109,18 @@ class SparkSession private(
   protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
 
   /**
+   * Runtime configuration interface for Spark.
+   *
+   * This is the interface through which the user can get and set all Spark and Hadoop
+   * configurations that are relevant to Spark SQL. When getting the value of a config,
+   * this defaults to the value set in the underlying [[SparkContext]], if any.
+   *
+   * @group config
+   * @since 2.0.0
+   */
+  @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
+
+  /**
    * :: Experimental ::
    * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
    * that listen for execution metrics.
@@ -187,89 +199,6 @@ class SparkSession private(
   }
 
 
-  /* -------------------------------------------------- *
-   |  Methods for accessing or mutating configurations  |
-   * -------------------------------------------------- */
-
-  /**
-   * Runtime configuration interface for Spark.
-   *
-   * This is the interface through which the user can get and set all Spark and Hadoop
-   * configurations that are relevant to Spark SQL. When getting the value of a config,
-   * this defaults to the value set in the underlying [[SparkContext]], if any.
-   *
-   * @group config
-   * @since 2.0.0
-   */
-  @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
-
-  /**
-   * Set Spark SQL configuration properties.
-   *
-   * @group config
-   * @since 2.0.0
-   */
-  def setConf(props: Properties): Unit = sessionState.setConf(props)
-
-  /**
-   * Set the given Spark SQL configuration property.
-   *
-   * @group config
-   * @since 2.0.0
-   */
-  def setConf(key: String, value: String): Unit = sessionState.setConf(key, value)
-
-  /**
-   * Return the value of Spark SQL configuration property for the given key.
-   *
-   * @group config
-   * @since 2.0.0
-   */
-  def getConf(key: String): String = sessionState.conf.getConfString(key)
-
-  /**
-   * Return the value of Spark SQL configuration property for the given key. If the key is
not set
-   * yet, return `defaultValue`.
-   *
-   * @group config
-   * @since 2.0.0
-   */
-  def getConf(key: String, defaultValue: String): String = {
-    sessionState.conf.getConfString(key, defaultValue)
-  }
-
-  /**
-   * Return all the configuration properties that have been set (i.e. not the default).
-   * This creates a new copy of the config properties in the form of a Map.
-   *
-   * @group config
-   * @since 2.0.0
-   */
-  def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs
-
-  /**
-   * Set the given Spark SQL configuration property.
-   */
-  protected[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
-    sessionState.setConf(entry, value)
-  }
-
-  /**
-   * Return the value of Spark SQL configuration property for the given key. If the key is
not set
-   * yet, return `defaultValue` in [[ConfigEntry]].
-   */
-  protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry)
-
-  /**
-   * Return the value of Spark SQL configuration property for the given key. If the key is
not set
-   * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not
the
-   * desired one.
-   */
-  protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
-    sessionState.conf.getConf(entry, defaultValue)
-  }
-
-
   /* --------------------------------- *
    |  Methods for creating DataFrames  |
    * --------------------------------- */

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index bbb2a22..2409b5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -56,7 +56,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
               "determining the number of reducers is not supported."
           throw new IllegalArgumentException(msg)
         } else {
-          sparkSession.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+          sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value)
           Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
         }
       }
@@ -65,7 +65,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sparkSession: SparkSession) => {
-        sparkSession.setConf(key, value)
+        sparkSession.conf.set(key, value)
         Seq(Row(key, value))
       }
       (keyValueOutput, runFunc)
@@ -74,7 +74,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
     // Queries all key-value pairs that are set in the SQLConf of the sparkSession.
     case None =>
       val runFunc = (sparkSession: SparkSession) => {
-        sparkSession.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+        sparkSession.conf.getAll.map { case (k, v) => Row(k, v) }.toSeq
       }
       (keyValueOutput, runFunc)
 
@@ -107,10 +107,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
     // Queries a single property.
     case Some((key, None)) =>
       val runFunc = (sparkSession: SparkSession) => {
-        val value =
-          try sparkSession.getConf(key) catch {
-            case _: NoSuchElementException => "<undefined>"
-          }
+        val value = sparkSession.conf.getOption(key).getOrElse("<undefined>")
         Seq(Row(key, value))
       }
       (keyValueOutput, runFunc)

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 4df7d0c..4921e4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
             dataColumns = dataColumns,
             inputSchema = query.output,
             PartitioningUtils.DEFAULT_PARTITION_NAME,
-            sparkSession.getConf(SQLConf.PARTITION_MAX_FILES),
+            sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
             isAppend)
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index c689ad0..b1513bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -143,10 +143,9 @@ private[sql] class DefaultSource
       parameters
           .get(ParquetRelation.MERGE_SCHEMA)
           .map(_.toBoolean)
-          .getOrElse(sparkSession.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+          .getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
 
-    val mergeRespectSummaries =
-      sparkSession.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+    val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
 
     val filesByType = splitFiles(files)
 
@@ -281,22 +280,23 @@ private[sql] class DefaultSource
     // Sets flags for `CatalystSchemaConverter`
     hadoopConf.setBoolean(
       SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
+      sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
     hadoopConf.setBoolean(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+      sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
 
     // Try to push down filters when filter push-down is enabled.
-    val pushed = if (sparkSession.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean)
{
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not all predicates
can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(requiredSchema, _))
-        .reduceOption(FilterApi.and)
-    } else {
-      None
-    }
+    val pushed =
+      if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+        filters
+          // Collects all converted Parquet filter predicates. Notice that not all predicates
can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a
`flatMap`
+          // is used here.
+          .flatMap(ParquetFilters.createFilter(requiredSchema, _))
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index c548fbd..b694b61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -80,11 +80,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
    * a live lock may happen if the compaction happens too frequently: one processing keeps
deleting
    * old files while another one keeps retrying. Setting a reasonable cleanup delay could
avoid it.
    */
-  private val fileCleanupDelayMs = sparkSession.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+  private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
 
-  private val isDeletingExpiredLog = sparkSession.getConf(SQLConf.FILE_SINK_LOG_DELETION)
+  private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
 
-  private val compactInterval = sparkSession.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+  private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
   require(compactInterval > 0,
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
       "to a positive value.")

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7de7748..0bcf0f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -763,9 +763,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
   }
 
   private def setConfWithCheck(key: String, value: String): Unit = {
-    if (key.startsWith("spark.") && !key.startsWith("spark.sql.")) {
-      logWarning(s"Attempt to set non-Spark SQL config in SQLConf: key = $key, value = $value")
-    }
     settings.put(key, value)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cacf50e..6fa044a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -152,9 +152,11 @@ private[sql] class SessionState(sparkSession: SparkSession) {
   private val jarClassLoader: NonClosableMutableURLClassLoader =
     sparkSession.sharedState.jarClassLoader
 
-  // Automatically extract `spark.sql.*` entries and put it in our SQLConf
+  // Automatically extract all entries and put it in our SQLConf
   // We need to call it after all of vals have been initialized.
-  setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))
+  sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
+    conf.setConfString(k, v)
+  }
 
   // ------------------------------------------------------
   //  Helper methods, partially leftover from pre-2.0 days
@@ -170,19 +172,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
     catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
   }
 
-  final def setConf(properties: Properties): Unit = {
-    properties.asScala.foreach { case (k, v) => setConf(k, v) }
-  }
-
-  final def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
-    conf.setConf(entry, value)
-    setConf(entry.key, entry.stringConverter(value))
-  }
-
-  def setConf(key: String, value: String): Unit = {
-    conf.setConfString(key, value)
-  }
-
   def addJar(path: String): Unit = {
     sparkSession.sparkContext.addJar(path)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2f62ad4..1d5fc57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -79,10 +79,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
   }
 
-  test("SQLContext can access `spark.sql.*` configs") {
-    sc.conf.set("spark.sql.with.or.without.you", "my love")
-    val sqlContext = new SQLContext(sc)
-    assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index e687e6a..b87f482 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.internal
 
-import org.apache.spark.sql.{QueryTest, SQLContext}
+import org.apache.spark.sql.{QueryTest, SparkSession, SQLContext}
 import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
 
 class SQLConfSuite extends QueryTest with SharedSQLContext {
@@ -125,4 +125,18 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
 
     sqlContext.conf.clear()
   }
+
+  test("SparkSession can access configs set in SparkConf") {
+    try {
+      sparkContext.conf.set("spark.to.be.or.not.to.be", "my love")
+      sparkContext.conf.set("spark.sql.with.or.without.you", "my love")
+      val spark = new SparkSession(sparkContext)
+      assert(spark.conf.get("spark.to.be.or.not.to.be") == "my love")
+      assert(spark.conf.get("spark.sql.with.or.without.you") == "my love")
+    } finally {
+      sparkContext.conf.remove("spark.to.be.or.not.to.be")
+      sparkContext.conf.remove("spark.sql.with.or.without.you")
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index b17a88b..f307691 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.sql._
@@ -114,12 +113,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
    *  - allow SQL11 keywords to be used as identifiers
    */
   def setDefaultOverrideConfs(): Unit = {
-    setConf(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false")
-  }
-
-  override def setConf(key: String, value: String): Unit = {
-    super.setConf(key, value)
-    metadataHive.runSqlHive(s"SET $key=$value")
+    conf.setConfString(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false")
   }
 
   override def addJar(path: String): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message