spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-11700] [SQL] Remove thread local SQLContext in SparkPlan
Date Mon, 30 Nov 2015 18:32:18 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2db4662fe -> 17275fa99


[SPARK-11700] [SQL] Remove thread local SQLContext  in SparkPlan

In 1.6, we introduce a public API to have a SQLContext for current thread, SparkPlan should
use that.

Author: Davies Liu <davies@databricks.com>

Closes #9990 from davies/leak_context.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17275fa9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17275fa9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17275fa9

Branch: refs/heads/master
Commit: 17275fa99c670537c52843df405279a52b5c9594
Parents: 2db4662
Author: Davies Liu <davies@databricks.com>
Authored: Mon Nov 30 10:32:13 2015 -0800
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Mon Nov 30 10:32:13 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/SQLContext.scala  | 10 +++++-----
 .../apache/spark/sql/execution/QueryExecution.scala   |  3 +--
 .../org/apache/spark/sql/execution/SparkPlan.scala    | 14 ++++----------
 .../org/apache/spark/sql/MultiSQLContextsSuite.scala  |  2 +-
 .../sql/execution/ExchangeCoordinatorSuite.scala      |  2 +-
 .../sql/execution/RowFormatConvertersSuite.scala      |  4 ++--
 6 files changed, 14 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17275fa9/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c2ac5f..8d27839 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,6 @@ import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SparkException, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
@@ -45,9 +44,10 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{execution => sparkexecution}
 import org.apache.spark.sql.util.ExecutionListenerManager
+import org.apache.spark.sql.{execution => sparkexecution}
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
  * The entry point for working with structured data (rows and columns) in Spark.  Allows
the
@@ -401,7 +401,7 @@ class SQLContext private[sql](
    */
   @Experimental
   def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
-    SparkPlan.currentContext.set(self)
+    SQLContext.setActive(self)
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributeSeq = schema.toAttributes
     val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
@@ -417,7 +417,7 @@ class SQLContext private[sql](
    */
   @Experimental
   def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
-    SparkPlan.currentContext.set(self)
+    SQLContext.setActive(self)
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributeSeq = schema.toAttributes
     DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
@@ -1334,7 +1334,7 @@ object SQLContext {
     activeContext.remove()
   }
 
-  private[sql] def getActiveContextOption(): Option[SQLContext] = {
+  private[sql] def getActive(): Option[SQLContext] = {
     Option(activeContext.get())
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/17275fa9/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5da5aea..107570f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -42,9 +42,8 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan)
{
 
   lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
 
-  // TODO: Don't just pick the first one...
   lazy val sparkPlan: SparkPlan = {
-    SparkPlan.currentContext.set(sqlContext)
+    SQLContext.setActive(sqlContext)
     sqlContext.planner.plan(optimizedPlan).next()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/17275fa9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 534a3bc..507641f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,21 +23,15 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Logging
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
 import org.apache.spark.sql.types.DataType
 
-object SparkPlan {
-  protected[sql] val currentContext = new ThreadLocal[SQLContext]()
-}
-
 /**
  * The base class for physical operators.
  */
@@ -49,7 +43,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with
Serializ
    * populated by the query planning infrastructure.
    */
   @transient
-  protected[spark] final val sqlContext = SparkPlan.currentContext.get()
+  protected[spark] final val sqlContext = SQLContext.getActive().get
 
   protected def sparkContext = sqlContext.sparkContext
 
@@ -69,7 +63,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with
Serializ
 
   /** Overridden make copy also propogates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
-    SparkPlan.currentContext.set(sqlContext)
+    SQLContext.setActive(sqlContext)
     super.makeCopy(newArgs)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/17275fa9/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
index 34c5c68..162c0b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
@@ -27,7 +27,7 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll
{
   private var sparkConf: SparkConf = _
 
   override protected def beforeAll(): Unit = {
-    originalActiveSQLContext = SQLContext.getActiveContextOption()
+    originalActiveSQLContext = SQLContext.getActive()
     originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
 
     SQLContext.clearActive()

http://git-wip-us.apache.org/repos/asf/spark/blob/17275fa9/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index b96d50a..180050b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -30,7 +30,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll
{
   private var originalInstantiatedSQLContext: Option[SQLContext] = _
 
   override protected def beforeAll(): Unit = {
-    originalActiveSQLContext = SQLContext.getActiveContextOption()
+    originalActiveSQLContext = SQLContext.getActive()
     originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
 
     SQLContext.clearActive()

http://git-wip-us.apache.org/repos/asf/spark/blob/17275fa9/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 6876ab0..13d68a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal,
IsNull}
 import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -94,7 +94,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext
{
   }
 
   test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
-    SparkPlan.currentContext.set(sqlContext)
+    SQLContext.setActive(sqlContext)
     val schema = ArrayType(StringType)
     val rows = (1 to 100).map { i =>
       InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message