spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23040][BACKPORT][CORE] Returns interruptible iterator for shuffle reader
Date Sun, 01 Apr 2018 14:59:01 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 507cff246 -> f1f10da2b


[SPARK-23040][BACKPORT][CORE] Returns interruptible iterator for shuffle reader

Backport https://github.com/apache/spark/pull/20449 and https://github.com/apache/spark/pull/20920
to branch-2.3

---

## What changes were proposed in this pull request?
Before this commit, a non-interruptible iterator is returned if aggregator or ordering is
specified.
This commit also ensures that sorter is closed even when task is cancelled(killed) in the
middle of sorting.

## How was this patch tested?
Add a unit test in JobCancellationSuite

Author: Xianjin YE <advancedxy@gmail.com>
Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #20954 from jiangxb1987/SPARK-23040-2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1f10da2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1f10da2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1f10da2

Branch: refs/heads/branch-2.3
Commit: f1f10da2b4a0913dd230b4a4de95bf05a4672dce
Parents: 507cff2
Author: Xianjin YE <advancedxy@gmail.com>
Authored: Sun Apr 1 22:58:52 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sun Apr 1 22:58:52 2018 +0800

----------------------------------------------------------------------
 .../spark/shuffle/BlockStoreShuffleReader.scala | 14 ++++-
 .../org/apache/spark/JobCancellationSuite.scala | 65 +++++++++++++++++++-
 2 files changed, 77 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f1f10da2/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 0562d45..2a77a1c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -95,7 +95,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
     }
 
     // Sort the output if there is a sort ordering defined.
-    dep.keyOrdering match {
+    val resultIter = dep.keyOrdering match {
       case Some(keyOrd: Ordering[K]) =>
         // Create an ExternalSorter to sort the data.
         val sorter =
@@ -104,9 +104,21 @@ private[spark] class BlockStoreShuffleReader[K, C](
         context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
         context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
         context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+        // Use completion callback to stop sorter if task was finished/cancelled.
+        context.addTaskCompletionListener(_ => {
+          sorter.stop()
+        })
         CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
       case None =>
         aggregatedIter
     }
+
+    resultIter match {
+      case _: InterruptibleIterator[Product2[K, C]] => resultIter
+      case _ =>
+        // Use another interruptible iterator here to support task cancellation as aggregator
+        // or(and) sorter may have consumed previous interruptible iterator.
+        new InterruptibleIterator[Product2[K, C]](context, resultIter)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1f10da2/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 8a77aea..3b793bb 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
@@ -26,7 +27,7 @@ import scala.concurrent.duration._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd,
SparkListenerTaskStart}
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
   override def afterEach() {
     try {
       resetSparkContext()
+      JobCancellationSuite.taskStartedSemaphore.drainPermits()
+      JobCancellationSuite.taskCancelledSemaphore.drainPermits()
+      JobCancellationSuite.twoJobsSharingStageSemaphore.drainPermits()
+      JobCancellationSuite.executionOfInterruptibleCounter.set(0)
     } finally {
       super.afterEach()
     }
@@ -320,6 +325,62 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     f2.get()
   }
 
+  test("interruptible iterator of shuffle reader") {
+    // In this test case, we create a Spark job of two stages. The second stage is cancelled
during
+    // execution and a counter is used to make sure that the corresponding tasks are indeed
+    // cancelled.
+    import JobCancellationSuite._
+    sc = new SparkContext("local[2]", "test interruptible iterator")
+
+    val taskCompletedSem = new Semaphore(0)
+
+    sc.addSparkListener(new SparkListener {
+      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit =
{
+        // release taskCancelledSemaphore when cancelTasks event has been posted
+        if (stageCompleted.stageInfo.stageId == 1) {
+          taskCancelledSemaphore.release(1000)
+        }
+      }
+
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        if (taskEnd.stageId == 1) { // make sure tasks are completed
+          taskCompletedSem.release()
+        }
+      }
+    })
+
+    val f = sc.parallelize(1 to 1000).map { i => (i, i) }
+      .repartitionAndSortWithinPartitions(new HashPartitioner(1))
+      .mapPartitions { iter =>
+        taskStartedSemaphore.release()
+        iter
+      }.foreachAsync { x =>
+        if (x._1 >= 10) {
+          // This block of code is partially executed. It will be blocked when x._1 >=
10 and the
+          // next iteration will be cancelled if the source iterator is interruptible. Then
in this
+          // case, the maximum num of increment would be 10(|1...10|)
+          taskCancelledSemaphore.acquire()
+        }
+        executionOfInterruptibleCounter.getAndIncrement()
+    }
+
+    taskStartedSemaphore.acquire()
+    // Job is cancelled when:
+    // 1. task in reduce stage has been started, guaranteed by previous line.
+    // 2. task in reduce stage is blocked after processing at most 10 records as
+    //    taskCancelledSemaphore is not released until cancelTasks event is posted
+    // After job being cancelled, task in reduce stage will be cancelled and no more iteration
are
+    // executed.
+    f.cancel()
+
+    val e = intercept[SparkException](f.get()).getCause
+    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+
+    // Make sure tasks are indeed completed.
+    taskCompletedSem.acquire()
+    assert(executionOfInterruptibleCounter.get() <= 10)
+ }
+
   def testCount() {
     // Cancel before launching any tasks
     {
@@ -381,7 +442,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
 
 
 object JobCancellationSuite {
+  // To avoid any headaches, reset these global variables in the companion class's afterEach
block
   val taskStartedSemaphore = new Semaphore(0)
   val taskCancelledSemaphore = new Semaphore(0)
   val twoJobsSharingStageSemaphore = new Semaphore(0)
+  val executionOfInterruptibleCounter = new AtomicInteger(0)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message