From reviews-return-921428-archive-asf-public=cust-asf.ponee.io@spark.apache.org Wed Sep 18 22:40:06 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 24CF8180634 for ; Thu, 19 Sep 2019 00:40:06 +0200 (CEST) Received: (qmail 34971 invoked by uid 500); 18 Sep 2019 22:40:05 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 34960 invoked by uid 99); 18 Sep 2019 22:40:05 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Sep 2019 22:40:05 +0000 From: GitBox To: reviews@spark.apache.org Subject: [GitHub] [spark] HeartSaVioR commented on a change in pull request #25706: [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values Message-ID: <156884640554.30239.6724801799925077887.gitbox@gitbox.apache.org> Date: Wed, 18 Sep 2019 22:40:05 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit HeartSaVioR commented on a change in pull request #25706: [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values URL: https://github.com/apache/spark/pull/25706#discussion_r325922809 ########## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ########## @@ -174,31 +174,72 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - val submittedStageInfos = new HashSet[StageInfo] - val successfulStages = new HashSet[Int] - val failedStages = new ArrayBuffer[Int] - val stageByOrderOfExecution = new ArrayBuffer[Int] - val endedTasks = new HashSet[Long] - val sparkListener = new SparkListener() { + /** + * Listeners which records some information to verify in UTs. Getter-kind methods in this class + * ensures the value is returned after ensuring there's no event to process, as well as the + * value is immutable: prevent showing odd result by race condition. + */ + class EventInfoRecordingListener extends SparkListener { + private val _submittedStageInfos = new HashSet[StageInfo] + private val _successfulStages = new HashSet[Int] + private val _failedStages = new ArrayBuffer[Int] + private val _stageByOrderOfExecution = new ArrayBuffer[Int] + private val _endedTasks = new HashSet[Long] + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - submittedStageInfos += stageSubmitted.stageInfo + _submittedStageInfos += stageSubmitted.stageInfo } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo - stageByOrderOfExecution += stageInfo.stageId + _stageByOrderOfExecution += stageInfo.stageId if (stageInfo.failureReason.isEmpty) { - successfulStages += stageInfo.stageId + _successfulStages += stageInfo.stageId } else { - failedStages += stageInfo.stageId + _failedStages += stageInfo.stageId } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - endedTasks += taskEnd.taskInfo.taskId + _endedTasks += taskEnd.taskInfo.taskId + } + + def submittedStageInfos: Set[StageInfo] = withWaitingListenerUntilEmpty { + _submittedStageInfos.toSet + } + + def successfulStages: Set[Int] = withWaitingListenerUntilEmpty { + _successfulStages.toSet + } + + def failedStages: List[Int] = withWaitingListenerUntilEmpty { + _failedStages.toList + } + + def stageByOrderOfExecution: List[Int] = withWaitingListenerUntilEmpty { + _stageByOrderOfExecution.toList + } + + def endedTask: Set[Long] = withWaitingListenerUntilEmpty { + _endedTasks.toSet + } + + def clear(): Unit = { + _submittedStageInfos.clear() + _successfulStages.clear() + _failedStages.clear() + _stageByOrderOfExecution.clear() + _endedTasks.clear() + } + + private def withWaitingListenerUntilEmpty[T](fn: => T): T = { + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) Review comment: #25837 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org