spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject [3/4] spark git commit: [SPARK-10176] [SQL] Show partially analyzed plans when checkAnswer fails to analyze
Date Fri, 04 Sep 2015 22:17:44 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index fad93b0..cafa1d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, Row, SQLConf}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -31,14 +30,14 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
 
-class PlannerSuite extends SparkFunSuite with SharedSQLContext {
+class PlannerSuite extends SharedSQLContext {
   import testImplicits._
 
   setupTestData()
 
   private def testPartialAggregationPlan(query: LogicalPlan): Unit = {
-    val _ctx = ctx
-    import _ctx.planner._
+    val planner = sqlContext.planner
+    import planner._
     val plannedOption = HashAggregation(query).headOption.orElse(Aggregation(query).headOption)
     val planned =
       plannedOption.getOrElse(
@@ -53,8 +52,8 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("unions are collapsed") {
-    val _ctx = ctx
-    import _ctx.planner._
+    val planner = sqlContext.planner
+    import planner._
     val query = testData.unionAll(testData).unionAll(testData).logicalPlan
     val planned = BasicOperators(query).head
     val logicalUnions = query collect { case u: logical.Union => u }
@@ -81,33 +80,30 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
-    def checkPlan(fieldTypes: Seq[DataType], newThreshold: Int): Unit = {
-      ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold)
-      val fields = fieldTypes.zipWithIndex.map {
-        case (dataType, index) => StructField(s"c${index}", dataType, true)
-      } :+ StructField("key", IntegerType, true)
-      val schema = StructType(fields)
-      val row = Row.fromSeq(Seq.fill(fields.size)(null))
-      val rowRDD = ctx.sparkContext.parallelize(row :: Nil)
-      ctx.createDataFrame(rowRDD, schema).registerTempTable("testLimit")
-
-      val planned = sql(
-        """
-          |SELECT l.a, l.b
-          |FROM testData2 l JOIN (SELECT * FROM testLimit LIMIT 1) r ON (l.a = r.key)
-        """.stripMargin).queryExecution.executedPlan
-
-      val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
-      val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
-
-      assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
-      assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
-
-      ctx.dropTempTable("testLimit")
+    def checkPlan(fieldTypes: Seq[DataType]): Unit = {
+      withTempTable("testLimit") {
+        val fields = fieldTypes.zipWithIndex.map {
+          case (dataType, index) => StructField(s"c${index}", dataType, true)
+        } :+ StructField("key", IntegerType, true)
+        val schema = StructType(fields)
+        val row = Row.fromSeq(Seq.fill(fields.size)(null))
+        val rowRDD = sparkContext.parallelize(row :: Nil)
+        sqlContext.createDataFrame(rowRDD, schema).registerTempTable("testLimit")
+
+        val planned = sql(
+          """
+            |SELECT l.a, l.b
+            |FROM testData2 l JOIN (SELECT * FROM testLimit LIMIT 1) r ON (l.a = r.key)
+          """.stripMargin).queryExecution.executedPlan
+
+        val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
+        val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+
+        assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
+        assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+      }
     }
 
-    val origThreshold = ctx.conf.autoBroadcastJoinThreshold
-
     val simpleTypes =
       NullType ::
       BooleanType ::
@@ -124,7 +120,9 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
       StringType ::
       BinaryType :: Nil
 
-    checkPlan(simpleTypes, newThreshold = 16434)
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "16434") {
+      checkPlan(simpleTypes)
+    }
 
     val complexTypes =
       ArrayType(DoubleType, true) ::
@@ -136,36 +134,37 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
         StructField("b", ArrayType(DoubleType), nullable = false),
         StructField("c", DoubleType, nullable = false))) :: Nil
 
-    checkPlan(complexTypes, newThreshold = 901617)
-
-    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "901617") {
+      checkPlan(complexTypes)
+    }
   }
 
   test("InMemoryRelation statistics propagation") {
-    val origThreshold = ctx.conf.autoBroadcastJoinThreshold
-    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920)
-
-    testData.limit(3).registerTempTable("tiny")
-    sql("CACHE TABLE tiny")
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "81920") {
+      withTempTable("tiny") {
+        testData.limit(3).registerTempTable("tiny")
+        sql("CACHE TABLE tiny")
 
-    val a = testData.as("a")
-    val b = ctx.table("tiny").as("b")
-    val planned = a.join(b, $"a.key" === $"b.key").queryExecution.executedPlan
+        val a = testData.as("a")
+        val b = sqlContext.table("tiny").as("b")
+        val planned = a.join(b, $"a.key" === $"b.key").queryExecution.executedPlan
 
-    val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
-    val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+        val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
+        val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
 
-    assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
-    assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+        assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
+        assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
 
-    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
+        sqlContext.clearCache()
+      }
+    }
   }
 
   test("efficient limit -> project -> sort") {
     {
       val query =
         testData.select('key, 'value).sort('key).limit(2).logicalPlan
-      val planned = ctx.planner.TakeOrderedAndProject(query)
+      val planned = sqlContext.planner.TakeOrderedAndProject(query)
       assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
       assert(planned.head.output === testData.select('key, 'value).logicalPlan.output)
     }
@@ -175,7 +174,7 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
       // into it.
       val query =
         testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan
-      val planned = ctx.planner.TakeOrderedAndProject(query)
+      val planned = sqlContext.planner.TakeOrderedAndProject(query)
       assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
       assert(planned.head.output === testData.select('value, 'key).logicalPlan.output)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index ef6ad59..4492e37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -39,20 +39,20 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
 
   test("planner should insert unsafe->safe conversions when required") {
     val plan = Limit(10, outputsUnsafe)
-    val preparedPlan = ctx.prepareForExecution.execute(plan)
+    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
     assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
   }
 
   test("filter can process unsafe rows") {
     val plan = Filter(IsNull(IsNull(Literal(1))), outputsUnsafe)
-    val preparedPlan = ctx.prepareForExecution.execute(plan)
+    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
     assert(getConverters(preparedPlan).size === 1)
     assert(preparedPlan.outputsUnsafeRows)
   }
 
   test("filter can process safe rows") {
     val plan = Filter(IsNull(IsNull(Literal(1))), outputsSafe)
-    val preparedPlan = ctx.prepareForExecution.execute(plan)
+    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
     assert(getConverters(preparedPlan).isEmpty)
     assert(!preparedPlan.outputsUnsafeRows)
   }
@@ -67,33 +67,33 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
   test("union requires all of its input rows' formats to agree") {
     val plan = Union(Seq(outputsSafe, outputsUnsafe))
     assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = ctx.prepareForExecution.execute(plan)
+    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
     assert(preparedPlan.outputsUnsafeRows)
   }
 
   test("union can process safe rows") {
     val plan = Union(Seq(outputsSafe, outputsSafe))
-    val preparedPlan = ctx.prepareForExecution.execute(plan)
+    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
     assert(!preparedPlan.outputsUnsafeRows)
   }
 
   test("union can process unsafe rows") {
     val plan = Union(Seq(outputsUnsafe, outputsUnsafe))
-    val preparedPlan = ctx.prepareForExecution.execute(plan)
+    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
     assert(preparedPlan.outputsUnsafeRows)
   }
 
   test("round trip with ConvertToUnsafe and ConvertToSafe") {
     val input = Seq(("hello", 1), ("world", 2))
     checkAnswer(
-      ctx.createDataFrame(input),
+      sqlContext.createDataFrame(input),
       plan => ConvertToSafe(ConvertToUnsafe(plan)),
       input.map(Row.fromTuple)
     )
   }
 
   test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
-    SparkPlan.currentContext.set(ctx)
+    SparkPlan.currentContext.set(sqlContext)
     val schema = ArrayType(StringType)
     val rows = (1 to 100).map { i =>
       InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 8fa77b0..3073d49 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SortSuite extends SparkPlanTest with SharedSQLContext {
+  import testImplicits.localSeqToDataFrameHolder
 
   // This test was originally added as an example of how to use [[SparkPlanTest]];
   // it's not designed to be a comprehensive test of ExternalSort.

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index 5ab8f44..de45ae4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -31,14 +31,7 @@ import org.apache.spark.sql.test.SQLTestUtils
  * class's test helper methods can be used, see [[SortSuite]].
  */
 private[sql] abstract class SparkPlanTest extends SparkFunSuite {
-  protected def _sqlContext: SQLContext
-
-  /**
-   * Creates a DataFrame from a local Seq of Product.
-   */
-  implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = {
-    _sqlContext.implicits.localSeqToDataFrameHolder(data)
-  }
+  protected def sqlContext: SQLContext
 
   /**
    * Runs the plan and makes sure the answer matches the expected result.
@@ -98,7 +91,7 @@ private[sql] abstract class SparkPlanTest extends SparkFunSuite {
       planFunction: Seq[SparkPlan] => SparkPlan,
       expectedAnswer: Seq[Row],
       sortAnswers: Boolean = true): Unit = {
-    SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers, _sqlContext) match {
+    SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers, sqlContext) match {
       case Some(errorMessage) => fail(errorMessage)
       case None =>
     }
@@ -122,7 +115,7 @@ private[sql] abstract class SparkPlanTest extends SparkFunSuite {
       expectedPlanFunction: SparkPlan => SparkPlan,
       sortAnswers: Boolean = true): Unit = {
     SparkPlanTest.checkAnswer(
-        input, planFunction, expectedPlanFunction, sortAnswers, _sqlContext) match {
+        input, planFunction, expectedPlanFunction, sortAnswers, sqlContext) match {
       case Some(errorMessage) => fail(errorMessage)
       case None =>
     }
@@ -149,13 +142,13 @@ object SparkPlanTest {
       planFunction: SparkPlan => SparkPlan,
       expectedPlanFunction: SparkPlan => SparkPlan,
       sortAnswers: Boolean,
-      _sqlContext: SQLContext): Option[String] = {
+      sqlContext: SQLContext): Option[String] = {
 
     val outputPlan = planFunction(input.queryExecution.sparkPlan)
     val expectedOutputPlan = expectedPlanFunction(input.queryExecution.sparkPlan)
 
     val expectedAnswer: Seq[Row] = try {
-      executePlan(expectedOutputPlan, _sqlContext)
+      executePlan(expectedOutputPlan, sqlContext)
     } catch {
       case NonFatal(e) =>
         val errorMessage =
@@ -170,7 +163,7 @@ object SparkPlanTest {
     }
 
     val actualAnswer: Seq[Row] = try {
-      executePlan(outputPlan, _sqlContext)
+      executePlan(outputPlan, sqlContext)
     } catch {
       case NonFatal(e) =>
         val errorMessage =
@@ -210,12 +203,12 @@ object SparkPlanTest {
       planFunction: Seq[SparkPlan] => SparkPlan,
       expectedAnswer: Seq[Row],
       sortAnswers: Boolean,
-      _sqlContext: SQLContext): Option[String] = {
+      sqlContext: SQLContext): Option[String] = {
 
     val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan))
 
     val sparkAnswer: Seq[Row] = try {
-      executePlan(outputPlan, _sqlContext)
+      executePlan(outputPlan, sqlContext)
     } catch {
       case NonFatal(e) =>
         val errorMessage =
@@ -238,10 +231,10 @@ object SparkPlanTest {
     }
   }
 
-  private def executePlan(outputPlan: SparkPlan, _sqlContext: SQLContext): Seq[Row] = {
+  private def executePlan(outputPlan: SparkPlan, sqlContext: SQLContext): Seq[Row] = {
     // A very simple resolver to make writing tests easier. In contrast to the real resolver
     // this is always case sensitive and does not try to handle scoping or complex type resolution.
-    val resolvedPlan = _sqlContext.prepareForExecution.execute(
+    val resolvedPlan = sqlContext.prepareForExecution.execute(
       outputPlan transform {
         case plan: SparkPlan =>
           val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
index 3158458..7a0f0df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
@@ -29,15 +29,16 @@ import org.apache.spark.sql.types._
  * A test suite that generates randomized data to test the [[TungstenSort]] operator.
  */
 class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
+  import testImplicits.localSeqToDataFrameHolder
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    ctx.conf.setConf(SQLConf.CODEGEN_ENABLED, true)
+    sqlContext.conf.setConf(SQLConf.CODEGEN_ENABLED, true)
   }
 
   override def afterAll(): Unit = {
     try {
-      ctx.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
+      sqlContext.conf.unsetConf(SQLConf.CODEGEN_ENABLED)
     } finally {
       super.afterAll()
     }
@@ -64,8 +65,7 @@ class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
   }
 
   test("sorting updates peak execution memory") {
-    val sc = ctx.sparkContext
-    AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "unsafe external sort") {
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") {
       checkThatPlansAgree(
         (1 to 100).map(v => Tuple1(v)).toDF("a"),
         (child: SparkPlan) => TungstenSort('a.asc :: Nil, true, child),
@@ -83,8 +83,8 @@ class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
   ) {
     test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
       val inputData = Seq.fill(1000)(randomDataGenerator())
-      val inputDf = ctx.createDataFrame(
-        ctx.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
+      val inputDf = sqlContext.createDataFrame(
+        sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
         StructType(StructField("a", dataType, nullable = true) :: Nil)
       )
       assert(TungstenSort.supportsSchema(inputDf.schema))

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
index 5fdb82b..afda0d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
@@ -37,7 +37,7 @@ class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLConte
       val newMutableProjection = (expr: Seq[Expression], schema: Seq[Attribute]) => {
         () => new InterpretedMutableProjection(expr, schema)
       }
-      val dummyAccum = SQLMetrics.createLongMetric(ctx.sparkContext, "dummy")
+      val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy")
       iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, 0,
         Seq.empty, newMutableProjection, Seq.empty, None, dummyAccum, dummyAccum)
       val numPages = iter.getHashMap.getNumDataPages

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 1174b27..6a18cc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -215,7 +215,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Complex field and type inferring with null in sampling") {
-    val jsonDF = ctx.read.json(jsonNullStruct)
+    val jsonDF = sqlContext.read.json(jsonNullStruct)
     val expectedSchema = StructType(
       StructField("headers", StructType(
         StructField("Charset", StringType, true) ::
@@ -234,7 +234,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Primitive field and type inferring") {
-    val jsonDF = ctx.read.json(primitiveFieldAndType)
+    val jsonDF = sqlContext.read.json(primitiveFieldAndType)
 
     val expectedSchema = StructType(
       StructField("bigInteger", DecimalType(20, 0), true) ::
@@ -262,7 +262,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Complex field and type inferring") {
-    val jsonDF = ctx.read.json(complexFieldAndType1)
+    val jsonDF = sqlContext.read.json(complexFieldAndType1)
 
     val expectedSchema = StructType(
       StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) ::
@@ -361,7 +361,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("GetField operation on complex data type") {
-    val jsonDF = ctx.read.json(complexFieldAndType1)
+    val jsonDF = sqlContext.read.json(complexFieldAndType1)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -377,7 +377,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Type conflict in primitive field values") {
-    val jsonDF = ctx.read.json(primitiveFieldValueTypeConflict)
+    val jsonDF = sqlContext.read.json(primitiveFieldValueTypeConflict)
 
     val expectedSchema = StructType(
       StructField("num_bool", StringType, true) ::
@@ -449,7 +449,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   ignore("Type conflict in primitive field values (Ignored)") {
-    val jsonDF = ctx.read.json(primitiveFieldValueTypeConflict)
+    val jsonDF = sqlContext.read.json(primitiveFieldValueTypeConflict)
     jsonDF.registerTempTable("jsonTable")
 
     // Right now, the analyzer does not promote strings in a boolean expression.
@@ -502,7 +502,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Type conflict in complex field values") {
-    val jsonDF = ctx.read.json(complexFieldValueTypeConflict)
+    val jsonDF = sqlContext.read.json(complexFieldValueTypeConflict)
 
     val expectedSchema = StructType(
       StructField("array", ArrayType(LongType, true), true) ::
@@ -526,7 +526,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Type conflict in array elements") {
-    val jsonDF = ctx.read.json(arrayElementTypeConflict)
+    val jsonDF = sqlContext.read.json(arrayElementTypeConflict)
 
     val expectedSchema = StructType(
       StructField("array1", ArrayType(StringType, true), true) ::
@@ -554,7 +554,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Handling missing fields") {
-    val jsonDF = ctx.read.json(missingFields)
+    val jsonDF = sqlContext.read.json(missingFields)
 
     val expectedSchema = StructType(
       StructField("a", BooleanType, true) ::
@@ -573,9 +573,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val dir = Utils.createTempDir()
     dir.delete()
     val path = dir.getCanonicalFile.toURI.toString
-    ctx.sparkContext.parallelize(1 to 100)
+    sparkContext.parallelize(1 to 100)
       .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
-    val jsonDF = ctx.read.option("samplingRatio", "0.49").json(path)
+    val jsonDF = sqlContext.read.option("samplingRatio", "0.49").json(path)
 
     val analyzed = jsonDF.queryExecution.analyzed
     assert(
@@ -590,7 +590,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
     val schema = StructType(StructField("a", LongType, true) :: Nil)
     val logicalRelation =
-      ctx.read.schema(schema).json(path)
+      sqlContext.read.schema(schema).json(path)
         .queryExecution.analyzed.asInstanceOf[LogicalRelation]
     val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
     assert(relationWithSchema.paths === Array(path))
@@ -603,7 +603,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     dir.delete()
     val path = dir.getCanonicalPath
     primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
-    val jsonDF = ctx.read.json(path)
+    val jsonDF = sqlContext.read.json(path)
 
     val expectedSchema = StructType(
       StructField("bigInteger", DecimalType(20, 0), true) ::
@@ -672,7 +672,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       StructField("null", StringType, true) ::
       StructField("string", StringType, true) :: Nil)
 
-    val jsonDF1 = ctx.read.schema(schema).json(path)
+    val jsonDF1 = sqlContext.read.schema(schema).json(path)
 
     assert(schema === jsonDF1.schema)
 
@@ -689,7 +689,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       "this is a simple string.")
     )
 
-    val jsonDF2 = ctx.read.schema(schema).json(primitiveFieldAndType)
+    val jsonDF2 = sqlContext.read.schema(schema).json(primitiveFieldAndType)
 
     assert(schema === jsonDF2.schema)
 
@@ -710,7 +710,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   test("Applying schemas with MapType") {
     val schemaWithSimpleMap = StructType(
       StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
-    val jsonWithSimpleMap = ctx.read.schema(schemaWithSimpleMap).json(mapType1)
+    val jsonWithSimpleMap = sqlContext.read.schema(schemaWithSimpleMap).json(mapType1)
 
     jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap")
 
@@ -738,7 +738,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val schemaWithComplexMap = StructType(
       StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
 
-    val jsonWithComplexMap = ctx.read.schema(schemaWithComplexMap).json(mapType2)
+    val jsonWithComplexMap = sqlContext.read.schema(schemaWithComplexMap).json(mapType2)
 
     jsonWithComplexMap.registerTempTable("jsonWithComplexMap")
 
@@ -764,7 +764,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-2096 Correctly parse dot notations") {
-    val jsonDF = ctx.read.json(complexFieldAndType2)
+    val jsonDF = sqlContext.read.json(complexFieldAndType2)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -782,7 +782,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-3390 Complex arrays") {
-    val jsonDF = ctx.read.json(complexFieldAndType2)
+    val jsonDF = sqlContext.read.json(complexFieldAndType2)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -805,7 +805,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-3308 Read top level JSON arrays") {
-    val jsonDF = ctx.read.json(jsonArray)
+    val jsonDF = sqlContext.read.json(jsonArray)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -823,64 +823,63 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
   test("Corrupt records") {
     // Test if we can query corrupt records.
-    val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
-    ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
-
-    val jsonDF = ctx.read.json(corruptRecords)
-    jsonDF.registerTempTable("jsonTable")
-
-    val schema = StructType(
-      StructField("_unparsed", StringType, true) ::
-      StructField("a", StringType, true) ::
-      StructField("b", StringType, true) ::
-      StructField("c", StringType, true) :: Nil)
-
-    assert(schema === jsonDF.schema)
-
-    // In HiveContext, backticks should be used to access columns starting with a underscore.
-    checkAnswer(
-      sql(
-        """
-          |SELECT a, b, c, _unparsed
-          |FROM jsonTable
-        """.stripMargin),
-      Row(null, null, null, "{") ::
-        Row(null, null, null, "") ::
-        Row(null, null, null, """{"a":1, b:2}""") ::
-        Row(null, null, null, """{"a":{, b:3}""") ::
-        Row("str_a_4", "str_b_4", "str_c_4", null) ::
-        Row(null, null, null, "]") :: Nil
-    )
-
-    checkAnswer(
-      sql(
-        """
-          |SELECT a, b, c
-          |FROM jsonTable
-          |WHERE _unparsed IS NULL
-        """.stripMargin),
-      Row("str_a_4", "str_b_4", "str_c_4")
-    )
-
-    checkAnswer(
-      sql(
-        """
-          |SELECT _unparsed
-          |FROM jsonTable
-          |WHERE _unparsed IS NOT NULL
-        """.stripMargin),
-      Row("{") ::
-        Row("") ::
-        Row("""{"a":1, b:2}""") ::
-        Row("""{"a":{, b:3}""") ::
-        Row("]") :: Nil
-    )
-
-    ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
+    withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+      withTempTable("jsonTable") {
+        val jsonDF = sqlContext.read.json(corruptRecords)
+        jsonDF.registerTempTable("jsonTable")
+
+        val schema = StructType(
+          StructField("_unparsed", StringType, true) ::
+          StructField("a", StringType, true) ::
+          StructField("b", StringType, true) ::
+          StructField("c", StringType, true) :: Nil)
+
+        assert(schema === jsonDF.schema)
+
+        // In HiveContext, backticks should be used to access columns starting with a underscore.
+        checkAnswer(
+          sql(
+            """
+              |SELECT a, b, c, _unparsed
+              |FROM jsonTable
+            """.stripMargin),
+          Row(null, null, null, "{") ::
+            Row(null, null, null, "") ::
+            Row(null, null, null, """{"a":1, b:2}""") ::
+            Row(null, null, null, """{"a":{, b:3}""") ::
+            Row("str_a_4", "str_b_4", "str_c_4", null) ::
+            Row(null, null, null, "]") :: Nil
+        )
+
+        checkAnswer(
+          sql(
+            """
+              |SELECT a, b, c
+              |FROM jsonTable
+              |WHERE _unparsed IS NULL
+            """.stripMargin),
+          Row("str_a_4", "str_b_4", "str_c_4")
+        )
+
+        checkAnswer(
+          sql(
+            """
+              |SELECT _unparsed
+              |FROM jsonTable
+              |WHERE _unparsed IS NOT NULL
+            """.stripMargin),
+          Row("{") ::
+            Row("") ::
+            Row("""{"a":1, b:2}""") ::
+            Row("""{"a":{, b:3}""") ::
+            Row("]") :: Nil
+        )
+      }
+    }
   }
 
   test("SPARK-4068: nulls in arrays") {
-    val jsonDF = ctx.read.json(nullsInArrays)
+    val jsonDF = sqlContext.read.json(nullsInArrays)
     jsonDF.registerTempTable("jsonTable")
 
     val schema = StructType(
@@ -926,7 +925,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5)
     }
 
-    val df1 = ctx.createDataFrame(rowRDD1, schema1)
+    val df1 = sqlContext.createDataFrame(rowRDD1, schema1)
     df1.registerTempTable("applySchema1")
     val df2 = df1.toDF
     val result = df2.toJSON.collect()
@@ -949,7 +948,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4))
     }
 
-    val df3 = ctx.createDataFrame(rowRDD2, schema2)
+    val df3 = sqlContext.createDataFrame(rowRDD2, schema2)
     df3.registerTempTable("applySchema2")
     val df4 = df3.toDF
     val result2 = df4.toJSON.collect()
@@ -957,8 +956,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
     assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
 
-    val jsonDF = ctx.read.json(primitiveFieldAndType)
-    val primTable = ctx.read.json(jsonDF.toJSON)
+    val jsonDF = sqlContext.read.json(primitiveFieldAndType)
+    val primTable = sqlContext.read.json(jsonDF.toJSON)
     primTable.registerTempTable("primativeTable")
     checkAnswer(
         sql("select * from primativeTable"),
@@ -970,8 +969,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
         "this is a simple string.")
       )
 
-    val complexJsonDF = ctx.read.json(complexFieldAndType1)
-    val compTable = ctx.read.json(complexJsonDF.toJSON)
+    val complexJsonDF = sqlContext.read.json(complexFieldAndType1)
+    val compTable = sqlContext.read.json(complexJsonDF.toJSON)
     compTable.registerTempTable("complexTable")
     // Access elements of a primitive array.
     checkAnswer(
@@ -1039,25 +1038,25 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       Some(empty),
       1.0,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(ctx)
+      None, None)(sqlContext)
     val logicalRelation0 = LogicalRelation(relation0)
     val relation1 = new JSONRelation(
       Some(singleRow),
       1.0,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(ctx)
+      None, None)(sqlContext)
     val logicalRelation1 = LogicalRelation(relation1)
     val relation2 = new JSONRelation(
       Some(singleRow),
       0.5,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(ctx)
+      None, None)(sqlContext)
     val logicalRelation2 = LogicalRelation(relation2)
     val relation3 = new JSONRelation(
       Some(singleRow),
       1.0,
       Some(StructType(StructField("b", IntegerType, true) :: Nil)),
-      None, None)(ctx)
+      None, None)(sqlContext)
     val logicalRelation3 = LogicalRelation(relation3)
 
     assert(relation0 !== relation1)
@@ -1078,18 +1077,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
     withTempPath(dir => {
       val path = dir.getCanonicalFile.toURI.toString
-      ctx.sparkContext.parallelize(1 to 100)
+      sparkContext.parallelize(1 to 100)
         .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
 
       val d1 = ResolvedDataSource(
-        ctx,
+        sqlContext,
         userSpecifiedSchema = None,
         partitionColumns = Array.empty[String],
         provider = classOf[DefaultSource].getCanonicalName,
         options = Map("path" -> path))
 
       val d2 = ResolvedDataSource(
-        ctx,
+        sqlContext,
         userSpecifiedSchema = None,
         partitionColumns = Array.empty[String],
         provider = classOf[DefaultSource].getCanonicalName,
@@ -1105,24 +1104,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-7565 MapType in JsonRDD") {
-    val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
-    ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
-
-    val schemaWithSimpleMap = StructType(
-      StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
-    try {
-      val temp = Utils.createTempDir().getPath
-
-      val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1)
-      df.write.mode("overwrite").parquet(temp)
-      // order of MapType is not defined
-      assert(ctx.read.parquet(temp).count() == 5)
-
-      val df2 = ctx.read.json(corruptRecords)
-      df2.write.mode("overwrite").parquet(temp)
-      checkAnswer(ctx.read.parquet(temp), df2.collect())
-    } finally {
-      ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
+    withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+      withTempDir { dir =>
+        val schemaWithSimpleMap = StructType(
+          StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
+        val df = sqlContext.read.schema(schemaWithSimpleMap).json(mapType1)
+
+        val path = dir.getAbsolutePath
+        df.write.mode("overwrite").parquet(path)
+        // order of MapType is not defined
+        assert(sqlContext.read.parquet(path).count() == 5)
+
+        val df2 = sqlContext.read.json(corruptRecords)
+        df2.write.mode("overwrite").parquet(path)
+        checkAnswer(sqlContext.read.parquet(path), df2.collect())
+      }
     }
   }
 
@@ -1142,19 +1138,19 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       val d1 = new File(root, "d1=1")
       // root/dt=1/col1=abc
       val p1_col1 = makePartition(
-        ctx.sparkContext.parallelize(2 to 5).map(i => s"""{"a": 1, "b": "str$i"}"""),
+        sparkContext.parallelize(2 to 5).map(i => s"""{"a": 1, "b": "str$i"}"""),
         d1,
         "col1",
         "abc")
 
       // root/dt=1/col1=abd
       val p2 = makePartition(
-        ctx.sparkContext.parallelize(6 to 10).map(i => s"""{"a": 1, "b": "str$i"}"""),
+        sparkContext.parallelize(6 to 10).map(i => s"""{"a": 1, "b": "str$i"}"""),
         d1,
         "col1",
         "abd")
 
-        ctx.read.json(root.getAbsolutePath).registerTempTable("test_myjson_with_part")
+        sqlContext.read.json(root.getAbsolutePath).registerTempTable("test_myjson_with_part")
         checkAnswer(sql(
           "SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abc'"), Row(4))
         checkAnswer(sql(

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index 2864181..713d1da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -21,10 +21,10 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 
 private[json] trait TestJsonData {
-  protected def _sqlContext: SQLContext
+  protected def sqlContext: SQLContext
 
   def primitiveFieldAndType: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"string":"this is a simple string.",
           "integer":10,
           "long":21474836470,
@@ -35,7 +35,7 @@ private[json] trait TestJsonData {
       }"""  :: Nil)
 
   def primitiveFieldValueTypeConflict: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
           "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
       """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
@@ -46,14 +46,14 @@ private[json] trait TestJsonData {
           "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil)
 
   def jsonNullStruct: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" ::
         """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" ::
         """{"nullstr":"","ip":"27.31.100.29","headers":""}""" ::
         """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil)
 
   def complexFieldValueTypeConflict: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"num_struct":11, "str_array":[1, 2, 3],
           "array":[], "struct_array":[], "struct": {}}""" ::
       """{"num_struct":{"field":false}, "str_array":null,
@@ -64,14 +64,14 @@ private[json] trait TestJsonData {
           "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil)
 
   def arrayElementTypeConflict: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}],
           "array2": [{"field":214748364700}, {"field":1}]}""" ::
       """{"array3": [{"field":"str"}, {"field":1}]}""" ::
       """{"array3": [1, 2, 3]}""" :: Nil)
 
   def missingFields: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"a":true}""" ::
       """{"b":21474836470}""" ::
       """{"c":[33, 44]}""" ::
@@ -79,7 +79,7 @@ private[json] trait TestJsonData {
       """{"e":"str"}""" :: Nil)
 
   def complexFieldAndType1: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"struct":{"field1": true, "field2": 92233720368547758070},
           "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
           "arrayOfString":["str1", "str2"],
@@ -95,7 +95,7 @@ private[json] trait TestJsonData {
          }"""  :: Nil)
 
   def complexFieldAndType2: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
           "complexArrayOfStruct": [
           {
@@ -149,7 +149,7 @@ private[json] trait TestJsonData {
       }""" :: Nil)
 
   def mapType1: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"map": {"a": 1}}""" ::
       """{"map": {"b": 2}}""" ::
       """{"map": {"c": 3}}""" ::
@@ -157,7 +157,7 @@ private[json] trait TestJsonData {
       """{"map": {"e": null}}""" :: Nil)
 
   def mapType2: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
       """{"map": {"b": {"field2": 2}}}""" ::
       """{"map": {"c": {"field1": [], "field2": 4}}}""" ::
@@ -166,21 +166,21 @@ private[json] trait TestJsonData {
       """{"map": {"f": {"field1": null}}}""" :: Nil)
 
   def nullsInArrays: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{"field1":[[null], [[["Test"]]]]}""" ::
       """{"field2":[null, [{"Test":1}]]}""" ::
       """{"field3":[[null], [{"Test":"2"}]]}""" ::
       """{"field4":[[null, [1,2,3]]]}""" :: Nil)
 
   def jsonArray: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """[{"a":"str_a_1"}]""" ::
       """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
       """[]""" :: Nil)
 
   def corruptRecords: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{""" ::
       """""" ::
       """{"a":1, b:2}""" ::
@@ -189,7 +189,7 @@ private[json] trait TestJsonData {
       """]""" :: Nil)
 
   def emptyRecords: RDD[String] =
-    _sqlContext.sparkContext.parallelize(
+    sqlContext.sparkContext.parallelize(
       """{""" ::
         """""" ::
         """{"a": {}}""" ::
@@ -198,7 +198,7 @@ private[json] trait TestJsonData {
         """]""" :: Nil)
 
 
-  lazy val singleRow: RDD[String] = _sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
+  lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
-  def empty: RDD[String] = _sqlContext.sparkContext.parallelize(Seq[String]())
+  def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
index 91f3ce4..0835bd1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -39,12 +39,13 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq
 
   protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = {
     val fsPath = new Path(path)
-    val fs = fsPath.getFileSystem(configuration)
+    val fs = fsPath.getFileSystem(hadoopConfiguration)
     val parquetFiles = fs.listStatus(fsPath, new PathFilter {
       override def accept(path: Path): Boolean = pathFilter(path)
     }).toSeq.asJava
 
-    val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true)
+    val footers =
+      ParquetFileReader.readAllFootersInParallel(hadoopConfiguration, parquetFiles, true)
     footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 08d2b9d..cd552e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -101,7 +101,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
   test("fixed-length decimals") {
     def makeDecimalRDD(decimal: DecimalType): DataFrame =
-      sqlContext.sparkContext
+      sparkContext
         .parallelize(0 to 1000)
         .map(i => Tuple1(i / 100.0))
         .toDF()
@@ -119,7 +119,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
   test("date type") {
     def makeDateRDD(): DataFrame =
-      sqlContext.sparkContext
+      sparkContext
         .parallelize(0 to 1000)
         .map(i => Tuple1(DateTimeUtils.toJavaDate(i)))
         .toDF()
@@ -207,7 +207,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   test("compression codec") {
     def compressionCodecFor(path: String): String = {
       val codecs = ParquetTypesConverter
-        .readMetaData(new Path(path), Some(configuration)).getBlocks.asScala
+        .readMetaData(new Path(path), Some(hadoopConfiguration)).getBlocks.asScala
         .flatMap(_.getColumns.asScala)
         .map(_.getCodec.name())
         .distinct
@@ -277,14 +277,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   test("write metadata") {
     withTempPath { file =>
       val path = new Path(file.toURI.toString)
-      val fs = FileSystem.getLocal(configuration)
+      val fs = FileSystem.getLocal(hadoopConfiguration)
       val attributes = ScalaReflection.attributesFor[(Int, String)]
-      ParquetTypesConverter.writeMetaData(attributes, path, configuration)
+      ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration)
 
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration))
+      val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration))
       val actualSchema = metaData.getFileMetaData.getSchema
       val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
 
@@ -355,7 +355,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       val path = new Path(location.getCanonicalPath)
 
       ParquetFileWriter.writeMetadataFile(
-        sqlContext.sparkContext.hadoopConfiguration,
+        sparkContext.hadoopConfiguration,
         path,
         Collections.singletonList(
           new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))))
@@ -370,12 +370,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   }
 
   test("SPARK-6352 DirectParquetOutputCommitter") {
-    val clonedConf = new Configuration(configuration)
+    val clonedConf = new Configuration(hadoopConfiguration)
 
     // Write to a parquet file and let it fail.
     // _temporary should be missing if direct output committer works.
     try {
-      configuration.set("spark.sql.parquet.output.committer.class",
+      hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
         classOf[DirectParquetOutputCommitter].getCanonicalName)
       sqlContext.udf.register("div0", (x: Int) => x / 0)
       withTempPath { dir =>
@@ -383,23 +383,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
           sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
         }
         val path = new Path(dir.getCanonicalPath, "_temporary")
-        val fs = path.getFileSystem(configuration)
+        val fs = path.getFileSystem(hadoopConfiguration)
         assert(!fs.exists(path))
       }
     } finally {
       // Hadoop 1 doesn't have `Configuration.unset`
-      configuration.clear()
-      clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+      hadoopConfiguration.clear()
+      clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
     }
   }
 
   test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
-    val clonedConf = new Configuration(configuration)
+    val clonedConf = new Configuration(hadoopConfiguration)
 
     // Write to a parquet file and let it fail.
     // _temporary should be missing if direct output committer works.
     try {
-      configuration.set("spark.sql.parquet.output.committer.class",
+      hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
         "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
       sqlContext.udf.register("div0", (x: Int) => x / 0)
       withTempPath { dir =>
@@ -407,25 +407,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
           sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
         }
         val path = new Path(dir.getCanonicalPath, "_temporary")
-        val fs = path.getFileSystem(configuration)
+        val fs = path.getFileSystem(hadoopConfiguration)
         assert(!fs.exists(path))
       }
     } finally {
       // Hadoop 1 doesn't have `Configuration.unset`
-      configuration.clear()
-      clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+      hadoopConfiguration.clear()
+      clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
     }
   }
 
 
   test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
     withTempPath { dir =>
-      val clonedConf = new Configuration(configuration)
+      val clonedConf = new Configuration(hadoopConfiguration)
 
-      configuration.set(
+      hadoopConfiguration.set(
         SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName)
 
-      configuration.set(
+      hadoopConfiguration.set(
         "spark.sql.parquet.output.committer.class",
         classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)
 
@@ -436,8 +436,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
         assert(message === "Intentional exception for testing purposes")
       } finally {
         // Hadoop 1 doesn't have `Configuration.unset`
-        configuration.clear()
-        clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+        hadoopConfiguration.clear()
+        clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
       }
     }
   }
@@ -455,11 +455,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   }
 
   test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
-    val clonedConf = new Configuration(configuration)
+    val clonedConf = new Configuration(hadoopConfiguration)
 
     // Using a output committer that always fail when committing a task, so that both
     // `commitTask()` and `abortTask()` are invoked.
-    configuration.set(
+    hadoopConfiguration.set(
       "spark.sql.parquet.output.committer.class",
       classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)
 
@@ -483,8 +483,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       }
     } finally {
       // Hadoop 1 doesn't have `Configuration.unset`
-      configuration.clear()
-      clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+      hadoopConfiguration.clear()
+      clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index ed8bafb..7bac860 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -517,7 +517,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     }
 
     val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
-    val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(row :: Nil), schema)
+    val df = sqlContext.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
 
     withTempPath { dir =>
       df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index a379523..9edbb52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
  * A test suite that tests various Parquet queries.
  */
 class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext {
+  import testImplicits._
 
   test("simple select queries") {
     withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
@@ -40,22 +41,22 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
 
   test("appending") {
     val data = (0 until 10).map(i => (i, i.toString))
-    ctx.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
+    sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
     withParquetTable(data, "t") {
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
-      checkAnswer(ctx.table("t"), (data ++ data).map(Row.fromTuple))
+      checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
     }
-    ctx.catalog.unregisterTable(Seq("tmp"))
+    sqlContext.catalog.unregisterTable(Seq("tmp"))
   }
 
   test("overwriting") {
     val data = (0 until 10).map(i => (i, i.toString))
-    ctx.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
+    sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
     withParquetTable(data, "t") {
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
-      checkAnswer(ctx.table("t"), data.map(Row.fromTuple))
+      checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
     }
-    ctx.catalog.unregisterTable(Seq("tmp"))
+    sqlContext.catalog.unregisterTable(Seq("tmp"))
   }
 
   test("self-join") {
@@ -118,9 +119,9 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     val schema = StructType(List(StructField("d", DecimalType(18, 0), false),
       StructField("time", TimestampType, false)).toArray)
     withTempPath { file =>
-      val df = ctx.createDataFrame(ctx.sparkContext.parallelize(data), schema)
+      val df = sqlContext.createDataFrame(sparkContext.parallelize(data), schema)
       df.write.parquet(file.getCanonicalPath)
-      val df2 = ctx.read.parquet(file.getCanonicalPath)
+      val df2 = sqlContext.read.parquet(file.getCanonicalPath)
       checkAnswer(df2, df.collect().toSeq)
     }
   }
@@ -129,12 +130,12 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     def testSchemaMerging(expectedColumnNumber: Int): Unit = {
       withTempDir { dir =>
         val basePath = dir.getCanonicalPath
-        ctx.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
-        ctx.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
+        sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+        sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
         // delete summary files, so if we don't merge part-files, one column will not be included.
         Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
         Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
-        assert(ctx.read.parquet(basePath).columns.length === expectedColumnNumber)
+        assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
       }
     }
 
@@ -153,9 +154,9 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     def testSchemaMerging(expectedColumnNumber: Int): Unit = {
       withTempDir { dir =>
         val basePath = dir.getCanonicalPath
-        ctx.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
-        ctx.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
-        assert(ctx.read.parquet(basePath).columns.length === expectedColumnNumber)
+        sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+        sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
+        assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
       }
     }
 
@@ -171,19 +172,19 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath
-      ctx.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
-      ctx.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=a").toString)
+      sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+      sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=a").toString)
 
       // Disables the global SQL option for schema merging
       withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") {
         assertResult(2) {
           // Disables schema merging via data source option
-          ctx.read.option("mergeSchema", "false").parquet(basePath).columns.length
+          sqlContext.read.option("mergeSchema", "false").parquet(basePath).columns.length
         }
 
         assertResult(3) {
           // Enables schema merging via data source option
-          ctx.read.option("mergeSchema", "true").parquet(basePath).columns.length
+          sqlContext.read.option("mergeSchema", "true").parquet(basePath).columns.length
         }
       }
     }
@@ -193,7 +194,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath
       val schema = StructType(Array(StructField("name", DecimalType(10, 5), false)))
-      val rowRDD = sqlContext.sparkContext.parallelize(Array(Row(Decimal("67123.45"))))
+      val rowRDD = sparkContext.parallelize(Array(Row(Decimal("67123.45"))))
       val df = sqlContext.createDataFrame(rowRDD, schema)
       df.write.parquet(basePath)
 
@@ -203,9 +204,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
   }
 
   test("SPARK-10005 Schema merging for nested struct") {
-    val sqlContext = _sqlContext
-    import sqlContext.implicits._
-
     withTempPath { dir =>
       val path = dir.getCanonicalPath
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 5dbc7d1..442fafb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  * Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
  */
 private[sql] trait ParquetTest extends SQLTestUtils {
-  protected def _sqlContext: SQLContext
 
   /**
    * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
@@ -43,7 +42,7 @@ private[sql] trait ParquetTest extends SQLTestUtils {
       (data: Seq[T])
       (f: String => Unit): Unit = {
     withTempPath { file =>
-      _sqlContext.createDataFrame(data).write.parquet(file.getCanonicalPath)
+      sqlContext.createDataFrame(data).write.parquet(file.getCanonicalPath)
       f(file.getCanonicalPath)
     }
   }
@@ -55,7 +54,7 @@ private[sql] trait ParquetTest extends SQLTestUtils {
   protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
       (data: Seq[T])
       (f: DataFrame => Unit): Unit = {
-    withParquetFile(data)(path => f(_sqlContext.read.parquet(path)))
+    withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
   }
 
   /**
@@ -67,14 +66,14 @@ private[sql] trait ParquetTest extends SQLTestUtils {
       (data: Seq[T], tableName: String)
       (f: => Unit): Unit = {
     withParquetDataFrame(data) { df =>
-      _sqlContext.registerDataFrameAsTable(df, tableName)
+      sqlContext.registerDataFrameAsTable(df, tableName)
       withTempTable(tableName)(f)
     }
   }
 
   protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
       data: Seq[T], path: File): Unit = {
-    _sqlContext.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath)
+    sqlContext.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath)
   }
 
   protected def makeParquetFile[T <: Product: ClassTag: TypeTag](

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 53a0e53..dcbfdca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -33,8 +33,7 @@ import org.apache.spark.sql.{SQLConf, SQLContext, QueryTest}
  * without serializing the hashed relation, which does not happen in local mode.
  */
 class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
-  private var sc: SparkContext = null
-  private var sqlContext: SQLContext = null
+  protected var sqlContext: SQLContext = null
 
   /**
    * Create a new [[SQLContext]] running in local-cluster mode with unsafe and codegen enabled.
@@ -44,15 +43,14 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
     val conf = new SparkConf()
       .setMaster("local-cluster[2,1,1024]")
       .setAppName("testing")
-    sc = new SparkContext(conf)
+    val sc = new SparkContext(conf)
     sqlContext = new SQLContext(sc)
     sqlContext.setConf(SQLConf.UNSAFE_ENABLED, true)
     sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
   }
 
   override def afterAll(): Unit = {
-    sc.stop()
-    sc = null
+    sqlContext.sparkContext.stop()
     sqlContext = null
   }
 
@@ -60,7 +58,7 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
    * Test whether the specified broadcast join updates the peak execution memory accumulator.
    */
   private def testBroadcastJoin[T: ClassTag](name: String, joinType: String): Unit = {
-    AccumulatorSuite.verifyPeakExecutionMemorySet(sc, name) {
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sqlContext.sparkContext, name) {
       val df1 = sqlContext.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
       val df2 = sqlContext.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
       // Comparison at the end is for broadcast left semi join

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 4c9187a..e5fd9e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -37,7 +37,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("GeneralHashedRelation") {
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
-    val numDataRows = SQLMetrics.createLongMetric(ctx.sparkContext, "data")
+    val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
     val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
     assert(hashed.isInstanceOf[GeneralHashedRelation])
 
@@ -53,7 +53,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("UniqueKeyHashedRelation") {
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
-    val numDataRows = SQLMetrics.createLongMetric(ctx.sparkContext, "data")
+    val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
     val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
     assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
 
@@ -73,7 +73,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
   test("UnsafeHashedRelation") {
     val schema = StructType(StructField("a", IntegerType, true) :: Nil)
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
-    val numDataRows = SQLMetrics.createLongMetric(ctx.sparkContext, "data")
+    val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
     val toUnsafe = UnsafeProjection.create(schema)
     val unsafeData = data.map(toUnsafe(_).copy()).toArray
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index cc649b9..4174ee0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -27,9 +27,10 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
 class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
+  import testImplicits.localSeqToDataFrameHolder
 
-  private lazy val myUpperCaseData = ctx.createDataFrame(
-    ctx.sparkContext.parallelize(Seq(
+  private lazy val myUpperCaseData = sqlContext.createDataFrame(
+    sparkContext.parallelize(Seq(
       Row(1, "A"),
       Row(2, "B"),
       Row(3, "C"),
@@ -39,8 +40,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
       Row(null, "G")
     )), new StructType().add("N", IntegerType).add("L", StringType))
 
-  private lazy val myLowerCaseData = ctx.createDataFrame(
-    ctx.sparkContext.parallelize(Seq(
+  private lazy val myLowerCaseData = sqlContext.createDataFrame(
+    sparkContext.parallelize(Seq(
       Row(1, "a"),
       Row(2, "b"),
       Row(3, "c"),

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index a1a617d..c2e0bda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.{IntegerType, DoubleType, StructType}
 
 class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
 
-  private lazy val left = ctx.createDataFrame(
-    ctx.sparkContext.parallelize(Seq(
+  private lazy val left = sqlContext.createDataFrame(
+    sparkContext.parallelize(Seq(
       Row(1, 2.0),
       Row(2, 100.0),
       Row(2, 1.0), // This row is duplicated to ensure that we will have multiple buffered matches
@@ -40,8 +40,8 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
       Row(null, null)
     )), new StructType().add("a", IntegerType).add("b", DoubleType))
 
-  private lazy val right = ctx.createDataFrame(
-    ctx.sparkContext.parallelize(Seq(
+  private lazy val right = sqlContext.createDataFrame(
+    sparkContext.parallelize(Seq(
       Row(0, 0.0),
       Row(2, 3.0), // This row is duplicated to ensure that we will have multiple buffered matches
       Row(2, -1.0),

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
index baa86e3..3afd762 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
 
 class SemiJoinSuite extends SparkPlanTest with SharedSQLContext {
 
-  private lazy val left = ctx.createDataFrame(
-    ctx.sparkContext.parallelize(Seq(
+  private lazy val left = sqlContext.createDataFrame(
+    sparkContext.parallelize(Seq(
       Row(1, 2.0),
       Row(1, 2.0),
       Row(2, 1.0),
@@ -40,8 +40,8 @@ class SemiJoinSuite extends SparkPlanTest with SharedSQLContext {
       Row(6, null)
     )), new StructType().add("a", IntegerType).add("b", DoubleType))
 
-  private lazy val right = ctx.createDataFrame(
-    ctx.sparkContext.parallelize(Seq(
+  private lazy val right = sqlContext.createDataFrame(
+    sparkContext.parallelize(Seq(
       Row(2, 3.0),
       Row(2, 3.0),
       Row(3, 2.0),

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 80006bf..6afffae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -36,7 +36,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
   import testImplicits._
 
   test("LongSQLMetric should not box Long") {
-    val l = SQLMetrics.createLongMetric(ctx.sparkContext, "long")
+    val l = SQLMetrics.createLongMetric(sparkContext, "long")
     val f = () => {
       l += 1L
       l.add(1L)
@@ -50,7 +50,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
 
   test("Normal accumulator should do boxing") {
     // We need this test to make sure BoxingFinder works.
-    val l = ctx.sparkContext.accumulator(0L)
+    val l = sparkContext.accumulator(0L)
     val f = () => { l += 1L }
     BoxingFinder.getClassReader(f.getClass).foreach { cl =>
       val boxingFinder = new BoxingFinder()
@@ -71,19 +71,19 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       df: DataFrame,
       expectedNumOfJobs: Int,
       expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
-    val previousExecutionIds = ctx.listener.executionIdToData.keySet
+    val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
     df.collect()
-    ctx.sparkContext.listenerBus.waitUntilEmpty(10000)
-    val executionIds = ctx.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    sparkContext.listenerBus.waitUntilEmpty(10000)
+    val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
     assert(executionIds.size === 1)
     val executionId = executionIds.head
-    val jobs = ctx.listener.getExecution(executionId).get.jobs
+    val jobs = sqlContext.listener.getExecution(executionId).get.jobs
     // Use "<=" because there is a race condition that we may miss some jobs
     // TODO Change it to "=" once we fix the race condition that missing the JobStarted event.
     assert(jobs.size <= expectedNumOfJobs)
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
-      val metricValues = ctx.listener.getExecutionMetrics(executionId)
+      val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
       val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
         expectedMetrics.contains(node.id)
       }.map { node =>
@@ -474,19 +474,19 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
 
   test("save metrics") {
     withTempPath { file =>
-      val previousExecutionIds = ctx.listener.executionIdToData.keySet
+      val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
       // Assume the execution plan is
       // PhysicalRDD(nodeId = 0)
       person.select('name).write.format("json").save(file.getAbsolutePath)
-      ctx.sparkContext.listenerBus.waitUntilEmpty(10000)
-      val executionIds = ctx.listener.executionIdToData.keySet.diff(previousExecutionIds)
+      sparkContext.listenerBus.waitUntilEmpty(10000)
+      val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
       assert(executionIds.size === 1)
       val executionId = executionIds.head
-      val jobs = ctx.listener.getExecution(executionId).get.jobs
+      val jobs = sqlContext.listener.getExecution(executionId).get.jobs
       // Use "<=" because there is a race condition that we may miss some jobs
       // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
       assert(jobs.size <= 1)
-      val metricValues = ctx.listener.getExecutionMetrics(executionId)
+      val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
       // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
       // However, we still can check the value.
       assert(metricValues.values.toSeq === Seq(2L))

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 80d1e88..2bbb41c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -74,7 +74,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("basic") {
-    val listener = new SQLListener(ctx)
+    val listener = new SQLListener(sqlContext)
     val executionId = 0
     val df = createTestDataFrame
     val accumulatorIds =
@@ -212,7 +212,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
-    val listener = new SQLListener(ctx)
+    val listener = new SQLListener(sqlContext)
     val executionId = 0
     val df = createTestDataFrame
     listener.onExecutionStart(
@@ -241,7 +241,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
-    val listener = new SQLListener(ctx)
+    val listener = new SQLListener(sqlContext)
     val executionId = 0
     val df = createTestDataFrame
     listener.onExecutionStart(
@@ -281,7 +281,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("onExecutionEnd happens before onJobEnd(JobFailed)") {
-    val listener = new SQLListener(ctx)
+    val listener = new SQLListener(sqlContext)
     val executionId = 0
     val df = createTestDataFrame
     listener.onExecutionStart(

http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d8c9a08..ed71068 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -255,26 +255,26 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
   }
 
   test("Basic API") {
-    assert(ctx.read.jdbc(
+    assert(sqlContext.read.jdbc(
       urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
   }
 
   test("Basic API with FetchSize") {
     val properties = new Properties
     properties.setProperty("fetchSize", "2")
-    assert(ctx.read.jdbc(
+    assert(sqlContext.read.jdbc(
       urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
   }
 
   test("Partitioning via JDBCPartitioningInfo API") {
     assert(
-      ctx.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
+      sqlContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
       .collect().length === 3)
   }
 
   test("Partitioning via list-of-where-clauses API") {
     val parts = Array[String]("THEID < 2", "THEID >= 2")
-    assert(ctx.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
+    assert(sqlContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
       .collect().length === 3)
   }
 
@@ -330,9 +330,9 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
   }
 
   test("test DATE types") {
-    val rows = ctx.read.jdbc(
+    val rows = sqlContext.read.jdbc(
       urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
-    val cachedRows = ctx.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+    val cachedRows = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
       .cache().collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
     assert(rows(1).getAs[java.sql.Date](1) === null)
@@ -340,8 +340,8 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
   }
 
   test("test DATE types in cache") {
-    val rows = ctx.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
-    ctx.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+    val rows = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
+    sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
       .cache().registerTempTable("mycached_date")
     val cachedRows = sql("select * from mycached_date").collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
@@ -349,7 +349,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
   }
 
   test("test types for null value") {
-    val rows = ctx.read.jdbc(
+    val rows = sqlContext.read.jdbc(
       urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect()
     assert((0 to 14).forall(i => rows(0).isNullAt(i)))
   }
@@ -396,7 +396,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
 
   test("Remap types via JdbcDialects") {
     JdbcDialects.registerDialect(testH2Dialect)
-    val df = ctx.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
+    val df = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
     assert(df.schema.filter(_.dataType != org.apache.spark.sql.types.StringType).isEmpty)
     val rows = df.collect()
     assert(rows(0).get(0).isInstanceOf[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message