spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Date Wed, 18 Apr 2018 23:37:47 GMT
Repository: spark
Updated Branches:
  refs/heads/master a9066478f -> 0c94e48bc


[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an
infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a suite and the
following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset
which ended up in canceling the same stage 2 times. This caused the infinite wait.

This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill`
and using `wait` and `CountDownLatch` for synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20888 from gaborgsomogyi/SPARK-23775.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c94e48b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c94e48b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c94e48b

Branch: refs/heads/master
Commit: 0c94e48bc50717e1627c0d2acd5382d9adc73c97
Parents: a906647
Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Authored: Wed Apr 18 16:37:41 2018 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Wed Apr 18 16:37:41 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 78 +++++++++++---------
 1 file changed, 45 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c94e48b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 57a930d..a0fd740 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.sql
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import scala.concurrent.duration._
 import scala.math.abs
 import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkException, TaskContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with
Eventuall
   }
 
   test("Cancelling stage in a query with Range.") {
-    val listener = new SparkListener {
-      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-        eventually(timeout(10.seconds), interval(1.millis)) {
-          assert(DataFrameRangeSuite.stageToKill > 0)
+    // Save and restore the value because SparkContext is shared
+    val savedInterruptOnCancel = sparkContext
+      .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+
+    try {
+      sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+
+      for (codegen <- Seq(true, false)) {
+        // This countdown latch used to make sure with all the stages cancelStage called
in listener
+        val latch = new CountDownLatch(2)
+
+        val listener = new SparkListener {
+          override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+            sparkContext.cancelStage(taskStart.stageId)
+            latch.countDown()
+          }
         }
-        sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
-      }
-    }
 
-    sparkContext.addSparkListener(listener)
-    for (codegen <- Seq(true, false)) {
-      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
-        DataFrameRangeSuite.stageToKill = -1
-        val ex = intercept[SparkException] {
-          spark.range(0, 100000000000L, 1, 1).map { x =>
-            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-            x
-          }.toDF("id").agg(sum("id")).collect()
+        sparkContext.addSparkListener(listener)
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
+          val ex = intercept[SparkException] {
+            sparkContext.range(0, 10000L, numSlices = 10).mapPartitions { x =>
+              x.synchronized {
+                x.wait()
+              }
+              x
+            }.toDF("id").agg(sum("id")).collect()
+          }
+          ex.getCause() match {
+            case null =>
+              assert(ex.getMessage().contains("cancelled"))
+            case cause: SparkException =>
+              assert(cause.getMessage().contains("cancelled"))
+            case cause: Throwable =>
+              fail("Expected the cause to be SparkException, got " + cause.toString() + "
instead.")
+          }
         }
-        ex.getCause() match {
-          case null =>
-            assert(ex.getMessage().contains("cancelled"))
-          case cause: SparkException =>
-            assert(cause.getMessage().contains("cancelled"))
-          case cause: Throwable =>
-            fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
+        latch.await(20, TimeUnit.SECONDS)
+        eventually(timeout(20.seconds)) {
+          assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum
== 0)
         }
+        sparkContext.removeSparkListener(listener)
       }
-      eventually(timeout(20.seconds)) {
-        assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum ==
0)
-      }
+    } finally {
+      sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
+        savedInterruptOnCancel)
     }
-    sparkContext.removeSparkListener(listener)
   }
 
   test("SPARK-20430 Initialize Range parameters in a driver side") {
@@ -204,7 +220,3 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with
Eventuall
     }
   }
 }
-
-object DataFrameRangeSuite {
-  @volatile var stageToKill = -1
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message