spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-20607][CORE] Add new unit tests to ShuffleSuite
Date Fri, 19 May 2017 14:25:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3f2cd51ee -> f398640da


[SPARK-20607][CORE] Add new unit tests to ShuffleSuite

## What changes were proposed in this pull request?

This PR update to two:
1.adds the new unit tests.
  testing would be performed when there is no shuffle stage,
  shuffle will not generate the data file and the index files.
2.Modify the '[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file'
unit test,
  parallelize is 1 but not is 2, Check the index file and delete.

## How was this patch tested?
The new unit test.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #17868 from heary-cao/ShuffleSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f398640d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f398640d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f398640d

Branch: refs/heads/master
Commit: f398640daa2ba8033f9a31c8f71cad39924e5eac
Parents: 3f2cd51
Author: caoxuewen <cao.xuewen@zte.com.cn>
Authored: Fri May 19 15:25:03 2017 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Fri May 19 15:25:03 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/ShuffleSuite.scala   | 30 ++++++++++++++++++--
 1 file changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f398640d/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 58b8659..622f798 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
 import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.shuffle.ShuffleWriter
-import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
+import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
 import org.apache.spark.util.{MutablePair, Utils}
 
 abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
@@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     // Delete one of the local shuffle blocks.
     val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0,
0))
     val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0,
0, 0))
-    assert(hashFile.exists() || sortFile.exists())
+    val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0,
0, 0))
+    assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))
 
     if (hashFile.exists()) {
       hashFile.delete()
@@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with
LocalSparkC
     if (sortFile.exists()) {
       sortFile.delete()
     }
+    if (indexFile.exists()) {
+      indexFile.delete()
+    }
 
     // This count should retry the execution of the previous stage and rerun shuffle.
     rdd.count()
   }
 
+  test("cannot find its local shuffle file if no execution of the stage and rerun shuffle")
{
+    sc = new SparkContext("local", "test", conf.clone())
+    val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
+
+    // Cannot find one of the local shuffle blocks.
+    val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0,
0))
+    val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0,
0, 0))
+    val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0,
0, 0))
+    assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())
+
+    rdd.count()
+
+    // Can find one of the local shuffle blocks.
+    val hashExistsFile = sc.env.blockManager.diskBlockManager
+      .getFile(new ShuffleBlockId(0, 0, 0))
+    val sortExistsFile = sc.env.blockManager.diskBlockManager
+      .getFile(new ShuffleDataBlockId(0, 0, 0))
+    val indexExistsFile = sc.env.blockManager.diskBlockManager
+      .getFile(new ShuffleIndexBlockId(0, 0, 0))
+    assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
+  }
+
   test("metrics for shuffle without aggregation") {
     sc = new SparkContext("local", "test", conf.clone())
     val numRecords = 10000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message