spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-24502][SQL] flaky test: UnsafeRowSerializerSuite
Date Tue, 12 Jun 2018 05:08:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master dc22465f3 -> 01452ea9c


[SPARK-24502][SQL] flaky test: UnsafeRowSerializerSuite

## What changes were proposed in this pull request?

`UnsafeRowSerializerSuite` calls `UnsafeProjection.create` which accesses `SQLConf.get`, while
the current active SparkSession may already be stopped, and we may hit exception like this

```
sbt.ForkMain$ForkError: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:119)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:286)
	at org.apache.spark.sql.test.TestSparkSession.sessionState$lzycompute(TestSQLContext.scala:42)
	at org.apache.spark.sql.test.TestSparkSession.sessionState(TestSQLContext.scala:41)
	at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95)
	at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:95)
	at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:94)
	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:126)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:54)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:157)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:150)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$unsafeRowConverter(UnsafeRowSerializerSuite.scala:54)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$toUnsafeRow(UnsafeRowSerializerSuite.scala:49)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:63)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:60)
...
```

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21518 from cloud-fan/test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01452ea9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01452ea9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01452ea9

Branch: refs/heads/master
Commit: 01452ea9c75ff027ceeb8314368c6bbedefdb2bf
Parents: dc22465
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Mon Jun 11 22:08:44 2018 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Mon Jun 11 22:08:44 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/LocalSparkSession.scala    |  4 +
 .../execution/UnsafeRowSerializerSuite.scala    | 80 ++++++++------------
 2 files changed, 35 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01452ea9/sql/core/src/test/scala/org/apache/spark/sql/LocalSparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/LocalSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/LocalSparkSession.scala
index d66a690..cbef1c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/LocalSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/LocalSparkSession.scala
@@ -30,11 +30,15 @@ trait LocalSparkSession extends BeforeAndAfterEach with BeforeAndAfterAll
{ self
   override def beforeAll() {
     super.beforeAll()
     InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
+    SparkSession.clearActiveSession()
+    SparkSession.clearDefaultSession()
   }
 
   override def afterEach() {
     try {
       resetSparkContext()
+      SparkSession.clearActiveSession()
+      SparkSession.clearDefaultSession()
     } finally {
       super.afterEach()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/01452ea9/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index a3ae938..d305ce3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -21,15 +21,13 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
 import java.util.Properties
 
 import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.ShuffleBlockId
-import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.ExternalSorter
 
 /**
@@ -43,7 +41,7 @@ class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStrea
   }
 }
 
-class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
+class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
 
   private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
     val converter = unsafeRowConverter(schema)
@@ -58,7 +56,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext
{
   }
 
   test("toUnsafeRow() test helper method") {
-    // This currently doesnt work because the generic getter throws an exception.
+    // This currently doesn't work because the generic getter throws an exception.
     val row = Row("Hello", 123)
     val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))
     assert(row.getString(0) === unsafeRow.getUTF8String(0).toString)
@@ -97,59 +95,43 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext
{
   }
 
   test("SPARK-10466: external sorter spilling with unsafe row serializer") {
-    var sc: SparkContext = null
-    var outputFile: File = null
-    val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten
-    Utils.tryWithSafeFinally {
-      val conf = new SparkConf()
-        .set("spark.shuffle.spill.initialMemoryThreshold", "1")
-        .set("spark.shuffle.sort.bypassMergeThreshold", "0")
-        .set("spark.testing.memory", "80000")
-
-      sc = new SparkContext("local", "test", conf)
-      outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
-      // prepare data
-      val converter = unsafeRowConverter(Array(IntegerType))
-      val data = (1 to 10000).iterator.map { i =>
-        (i, converter(Row(i)))
-      }
-      val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
-      val taskContext = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties,
null)
-
-      val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
-        taskContext,
-        partitioner = Some(new HashPartitioner(10)),
-        serializer = new UnsafeRowSerializer(numFields = 1))
-
-      // Ensure we spilled something and have to merge them later
-      assert(sorter.numSpills === 0)
-      sorter.insertAll(data)
-      assert(sorter.numSpills > 0)
+    val conf = new SparkConf()
+      .set("spark.shuffle.spill.initialMemoryThreshold", "1")
+      .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+      .set("spark.testing.memory", "80000")
+    spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
+    val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
+    outputFile.deleteOnExit()
+    // prepare data
+    val converter = unsafeRowConverter(Array(IntegerType))
+    val data = (1 to 10000).iterator.map { i =>
+      (i, converter(Row(i)))
+    }
+    val taskMemoryManager = new TaskMemoryManager(spark.sparkContext.env.memoryManager, 0)
+    val taskContext = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties,
null)
 
-      // Merging spilled files should not throw assertion error
-      sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
-    } {
-      // Clean up
-      if (sc != null) {
-        sc.stop()
-      }
+    val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
+      taskContext,
+      partitioner = Some(new HashPartitioner(10)),
+      serializer = new UnsafeRowSerializer(numFields = 1))
 
-      // restore the spark env
-      SparkEnv.set(oldEnv)
+    // Ensure we spilled something and have to merge them later
+    assert(sorter.numSpills === 0)
+    sorter.insertAll(data)
+    assert(sorter.numSpills > 0)
 
-      if (outputFile != null) {
-        outputFile.delete()
-      }
-    }
+    // Merging spilled files should not throw assertion error
+    sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
   }
 
   test("SPARK-10403: unsafe row serializer with SortShuffleManager") {
     val conf = new SparkConf().set("spark.shuffle.manager", "sort")
-    sc = new SparkContext("local", "test", conf)
+    spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
     val row = Row("Hello", 123)
     val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))
-    val rowsRDD = sc.parallelize(Seq((0, unsafeRow), (1, unsafeRow), (0, unsafeRow)))
-      .asInstanceOf[RDD[Product2[Int, InternalRow]]]
+    val rowsRDD = spark.sparkContext.parallelize(
+      Seq((0, unsafeRow), (1, unsafeRow), (0, unsafeRow))
+    ).asInstanceOf[RDD[Product2[Int, InternalRow]]]
     val dependency =
       new ShuffleDependency[Int, InternalRow, InternalRow](
         rowsRDD,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message