spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-5083][Core] Fix a flaky test in TaskResultGetterSuite
Date Mon, 05 Jan 2015 05:09:24 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6c726a3fb -> 27e7f5a72


[SPARK-5083][Core] Fix a flaky test in TaskResultGetterSuite

Because `sparkEnv.blockManager.master.removeBlock` is asynchronous, we need to make sure the
block has already been removed before calling `super.enqueueSuccessfulTask`.

Author: zsxwing <zsxwing@gmail.com>

Closes #3894 from zsxwing/SPARK-5083 and squashes the following commits:

d97c03d [zsxwing] Fix a flaky test in TaskResultGetterSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27e7f5a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27e7f5a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27e7f5a7

Branch: refs/heads/master
Commit: 27e7f5a7237d9d64a3b2c8a030ba3e3a9a96b26c
Parents: 6c726a3
Author: zsxwing <zsxwing@gmail.com>
Authored: Sun Jan 4 21:09:21 2015 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sun Jan 4 21:09:21 2015 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskResultGetterSuite.scala | 22 ++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27e7f5a7/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 3aab5a1..e3a3803 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -19,7 +19,12 @@ package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.storage.TaskResultBlockId
@@ -34,6 +39,8 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
   extends TaskResultGetter(sparkEnv, scheduler) {
   var removedResult = false
 
+  @volatile var removeBlockSuccessfully = false
+
   override def enqueueSuccessfulTask(
     taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
     if (!removedResult) {
@@ -42,6 +49,15 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
       serializer.get().deserialize[TaskResult[_]](serializedData) match {
         case IndirectTaskResult(blockId, size) =>
           sparkEnv.blockManager.master.removeBlock(blockId)
+          // removeBlock is asynchronous. Need to wait it's removed successfully
+          try {
+            eventually(timeout(3 seconds), interval(200 milliseconds)) {
+              assert(!sparkEnv.blockManager.master.contains(blockId))
+            }
+            removeBlockSuccessfully = true
+          } catch {
+            case NonFatal(e) => removeBlockSuccessfully = false
+          }
         case directResult: DirectTaskResult[_] =>
           taskSetManager.abort("Internal error: expect only indirect results")
       }
@@ -92,10 +108,12 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with
LocalSpark
         assert(false, "Expect local cluster to use TaskSchedulerImpl")
         throw new ClassCastException
     }
-    scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+    val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+    scheduler.taskResultGetter = resultGetter
     val akkaFrameSize =
       sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
     val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,
y) => x)
+    assert(resultGetter.removeBlockSuccessfully)
     assert(result === 1.to(akkaFrameSize).toArray)
 
     // Make sure two tasks were run (one failed one, and a second retried one).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message