spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-13845][CORE] Using onBlockUpdated to replace onTaskEnd avioding driver OOM
Date Mon, 28 Mar 2016 23:56:32 GMT
Repository: spark
Updated Branches:
  refs/heads/master a916d2a45 -> ad9e3d50f


[SPARK-13845][CORE] Using onBlockUpdated to replace onTaskEnd avioding driver OOM

## What changes were proposed in this pull request?

We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here
is some driver heap dump before OOM
```
 num     #instances         #bytes  class name
----------------------------------------------
   1:      13845916      553836640  org.apache.spark.storage.BlockStatus
   2:      14020324      336487776  org.apache.spark.storage.StreamBlockId
   3:      13883881      333213144  scala.collection.mutable.DefaultEntry
   4:          8907       89043952  [Lscala.collection.mutable.HashEntry;
   5:         62360       65107352  [B
   6:        163368       24453904  [Ljava.lang.Object;
   7:        293651       20342664  [C
...
```
`BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end.
After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems
never remove the blocks from `StorageStatus`.
In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can
update the block informations(add blocks, drop the block from memory to disk and delete the
blocks) in time.

## How was this patch tested?

Existing unit tests and manual tests

Author: jeanlyn <jeanlyn92@gmail.com>

Closes #11779 from jeanlyn/fix_driver_oom.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad9e3d50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad9e3d50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad9e3d50

Branch: refs/heads/master
Commit: ad9e3d50f71b096872806a86d89c03a208b1cf8b
Parents: a916d2a
Author: jeanlyn <jeanlyn92@gmail.com>
Authored: Mon Mar 28 16:56:25 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Mon Mar 28 16:56:25 2016 -0700

----------------------------------------------------------------------
 .../spark/storage/StorageStatusListener.scala   | 21 +++---
 .../apache/spark/ui/storage/StorageTab.scala    | 21 +++---
 .../executor_list_json_expectation.json         |  4 +-
 .../rdd_list_storage_json_expectation.json      | 10 +--
 .../deploy/history/HistoryServerSuite.scala     |  5 +-
 .../storage/StorageStatusListenerSuite.scala    | 67 +++++++++++---------
 .../spark/ui/storage/StorageTabSuite.scala      | 58 ++++++++---------
 7 files changed, 91 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index f552b49..3008520 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
     }
   }
 
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    val info = taskEnd.taskInfo
-    val metrics = taskEnd.taskMetrics
-    if (info != null && metrics != null) {
-      val updatedBlocks = metrics.updatedBlockStatuses
-      if (updatedBlocks.length > 0) {
-        updateStorageStatus(info.executorId, updatedBlocks)
-      }
-    }
-  }
-
   override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized
{
     updateStorageStatus(unpersistRDD.rddId)
   }
@@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
       }
     }
   }
+
+  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+    val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
+    val blockId = blockUpdated.blockUpdatedInfo.blockId
+    val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+    val memSize = blockUpdated.blockUpdatedInfo.memSize
+    val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+    val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+    updateStorageStatus(executorId, Seq((blockId, blockStatus)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 8f75b58..5009583 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends
Bloc
     StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
   }
 
-  /**
-   * Assumes the storage status list is fully up-to-date. This implies the corresponding
-   * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
-   */
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    val metrics = taskEnd.taskMetrics
-    if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
-      updateRDDInfo(metrics.updatedBlockStatuses)
-    }
-  }
-
   override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized
{
     val rddInfos = stageSubmitted.stageInfo.rddInfos
     rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
@@ -84,4 +73,14 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends
Bloc
   override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized
{
     _rddInfoMap.remove(unpersistRDD.rddId)
   }
+
+  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+    super.onBlockUpdated(blockUpdated)
+    val blockId = blockUpdated.blockUpdatedInfo.blockId
+    val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+    val memSize = blockUpdated.blockUpdatedInfo.memSize
+    val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+    val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+    updateRDDInfo(Seq((blockId, blockStatus)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index 4a88eee..efc8659 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -2,8 +2,8 @@
   "id" : "<driver>",
   "hostPort" : "localhost:57971",
   "isActive" : true,
-  "rddBlocks" : 8,
-  "memoryUsed" : 28000128,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
   "diskUsed" : 0,
   "totalCores" : 0,
   "maxTasks" : 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
b/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
index f79a310..8878e54 100644
--- a/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/rdd_list_storage_json_expectation.json
@@ -1,9 +1 @@
-[ {
-  "id" : 0,
-  "name" : "0",
-  "numPartitions" : 8,
-  "numCachedPartitions" : 8,
-  "storageLevel" : "Memory Deserialized 1x Replicated",
-  "memoryUsed" : 28000128,
-  "diskUsed" : 0
-} ]
\ No newline at end of file
+[ ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 5822261..79e4efb 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -140,8 +140,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with
Matchers
     "stage task list from multi-attempt app json(2)" ->
       "applications/local-1426533911241/2/stages/0/0/taskList",
 
-    "rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
-    "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
+    "rdd list storage json" -> "applications/local-1422981780767/storage/rdd"
+    // Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
+    // "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
   )
 
   // run a bunch of characterization tests -- just verify the behavior is the same as what
is saved

http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 14daa00..9835f11 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -82,48 +82,51 @@ class StorageStatusListenerSuite extends SparkFunSuite {
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
   }
 
-  test("task end with updated blocks") {
+  test("updated blocks") {
     val listener = new StorageStatusListener(conf)
     listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
-    val taskMetrics1 = new TaskMetrics
-    val taskMetrics2 = new TaskMetrics
-    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
-    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
-    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
-    taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
-    taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
-
-    // Task end with new blocks
+
+    val blockUpdateInfos1 = Seq(
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
+    )
+    val blockUpdateInfos2 =
+      Seq(BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
+
+    // Add some new blocks
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
+    postUpdateBlock(listener, blockUpdateInfos1)
     assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
+    postUpdateBlock(listener, blockUpdateInfos2)
     assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
     assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
 
-    // Task end with dropped blocks
-    val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L))
-    val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L))
-    val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L))
-    taskMetrics1.setUpdatedBlockStatuses(Seq(droppedBlock1, droppedBlock3))
-    taskMetrics2.setUpdatedBlockStatuses(Seq(droppedBlock2, droppedBlock3))
+    // Dropped the blocks
+    val droppedBlockInfo1 = Seq(
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.NONE, 0L, 0L),
+      BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
+    )
+    val droppedBlockInfo2 = Seq(
+      BlockUpdatedInfo(bm2, RDDBlockId(1, 2), StorageLevel.NONE, 0L, 0L),
+      BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
+    )
 
-    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
+    postUpdateBlock(listener, droppedBlockInfo1)
     assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
     assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
     assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
+    postUpdateBlock(listener, droppedBlockInfo2)
     assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
     assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
@@ -134,15 +137,14 @@ class StorageStatusListenerSuite extends SparkFunSuite {
   test("unpersist RDD") {
     val listener = new StorageStatusListener(conf)
     listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
-    val taskMetrics1 = new TaskMetrics
-    val taskMetrics2 = new TaskMetrics
-    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
-    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
-    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
-    taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
-    taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
+    val blockUpdateInfos1 = Seq(
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
+    )
+    val blockUpdateInfos2 =
+      Seq(BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
+    postUpdateBlock(listener, blockUpdateInfos1)
+    postUpdateBlock(listener, blockUpdateInfos2)
     assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
 
     // Unpersist RDD
@@ -155,4 +157,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
     listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
   }
+
+  private def postUpdateBlock(
+      listener: StorageStatusListener, updateBlockInfos: Seq[BlockUpdatedInfo]): Unit = {
+    updateBlockInfos.foreach { updateBlockInfo =>
+      listener.onBlockUpdated(SparkListenerBlockUpdated(updateBlockInfo))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad9e3d50/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 6b7c538..7d77dee 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -106,7 +106,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     assert(storageListener.rddInfoList.size === 0)
   }
 
-  test("task end") {
+  test("block update") {
     val myRddInfo0 = rddInfo0
     val myRddInfo1 = rddInfo1
     val myRddInfo2 = rddInfo2
@@ -120,19 +120,13 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!storageListener._rddInfoMap(1).isCached)
     assert(!storageListener._rddInfoMap(2).isCached)
 
-    // Task end with no updated blocks. This should not change anything.
-    bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics))
-    assert(storageListener._rddInfoMap.size === 3)
-    assert(storageListener.rddInfoList.size === 0)
-
-    // Task end with a few new persisted blocks, some from the same RDD
-    val metrics1 = new TaskMetrics
-    metrics1.setUpdatedBlockStatuses(Seq(
-      (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)),
-      (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)),
-      (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L))
-    ))
-    bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
+    // Some blocks updated
+    val blockUpdateInfos = Seq(
+      BlockUpdatedInfo(bm1, RDDBlockId(0, 100), memAndDisk, 400L, 0L),
+      BlockUpdatedInfo(bm1, RDDBlockId(0, 101), memAndDisk, 0L, 400L),
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 20), memAndDisk, 0L, 240L)
+    )
+    postUpdateBlocks(bus, blockUpdateInfos)
     assert(storageListener._rddInfoMap(0).memSize === 400L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
     assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
@@ -144,15 +138,14 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!storageListener._rddInfoMap(2).isCached)
     assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
 
-    // Task end with a few dropped blocks
-    val metrics2 = new TaskMetrics
-    metrics2.setUpdatedBlockStatuses(Seq(
-      (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)),
-      (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)),
-      (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist
-      (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L)) // doesn't actually exist
-    ))
-    bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
+    // Drop some blocks
+    val blockUpdateInfos2 = Seq(
+      BlockUpdatedInfo(bm1, RDDBlockId(0, 100), none, 0L, 0L),
+      BlockUpdatedInfo(bm1, RDDBlockId(1, 20), none, 0L, 0L),
+      BlockUpdatedInfo(bm1, RDDBlockId(2, 40), none, 0L, 0L), // doesn't actually exist
+      BlockUpdatedInfo(bm1, RDDBlockId(4, 80), none, 0L, 0L) // doesn't actually exist
+    )
+    postUpdateBlocks(bus, blockUpdateInfos2)
     assert(storageListener._rddInfoMap(0).memSize === 0L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
     assert(storageListener._rddInfoMap(0).numCachedPartitions === 1)
@@ -169,24 +162,27 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
     val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4))
     val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
     val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
-    val taskMetrics0 = new TaskMetrics
-    val taskMetrics1 = new TaskMetrics
-    val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L))
-    val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L))
-    taskMetrics0.setUpdatedBlockStatuses(Seq(block0))
-    taskMetrics1.setUpdatedBlockStatuses(Seq(block1))
+    val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
+    val blockUpdateInfos2 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(1, 1), memOnly, 200L, 0L))
     bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener.rddInfoList.size === 0)
-    bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
+    postUpdateBlocks(bus, blockUpdateInfos1)
     assert(storageListener.rddInfoList.size === 1)
     bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
     assert(storageListener.rddInfoList.size === 1)
     bus.postToAll(SparkListenerStageCompleted(stageInfo0))
     assert(storageListener.rddInfoList.size === 1)
-    bus.postToAll(SparkListenerTaskEnd(1, 0, "small", Success, taskInfo1, taskMetrics1))
+    postUpdateBlocks(bus, blockUpdateInfos2)
     assert(storageListener.rddInfoList.size === 2)
     bus.postToAll(SparkListenerStageCompleted(stageInfo1))
     assert(storageListener.rddInfoList.size === 2)
   }
+
+  private def postUpdateBlocks(
+      bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
+    blockUpdateInfos.foreach { blockUpdateInfo =>
+      bus.postToAll(SparkListenerBlockUpdated(blockUpdateInfo))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message