Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A3071200BA6 for ; Tue, 18 Oct 2016 09:50:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A1B62160ADC; Tue, 18 Oct 2016 07:50:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9C9B0160ACC for ; Tue, 18 Oct 2016 09:50:05 +0200 (CEST) Received: (qmail 28805 invoked by uid 500); 18 Oct 2016 07:50:04 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 28796 invoked by uid 99); 18 Oct 2016 07:50:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Oct 2016 07:50:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87439E00DC; Tue, 18 Oct 2016 07:50:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tdas@apache.org To: commits@spark.apache.org Message-Id: <9f132dd1a42f452489d3cdd63b7add0e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite Date: Tue, 18 Oct 2016 07:50:04 +0000 (UTC) archived-at: Tue, 18 Oct 2016 07:50:06 -0000 Repository: spark Updated Branches: refs/heads/master 1c5a7d7f6 -> 7d878cf2d [SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite This work has largely been done by lw-lin in his PR #15497. This is a slight refactoring of it. ## What changes were proposed in this pull request? There were two sources of flakiness in StreamingQueryListener test. - When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock. ``` +-----------------------------------+--------------------------------+ | StreamExecution thread | testing thread | +-----------------------------------+--------------------------------+ | ManualClock.waitTillTime(100) { | | | _isWaiting = true | | | wait(10) | | | still in wait(10) | if (_isWaiting) advance(100) | | still in wait(10) | if (_isWaiting) advance(200) | <- this should be disallowed ! | still in wait(10) | if (_isWaiting) advance(300) | <- this should be disallowed ! | wake up from wait(10) | | | current time is 600 | | | _isWaiting = false | | | } | | +-----------------------------------+--------------------------------+ ``` - Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger. My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`). In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest. ## How was this patch tested? Ran existing unit test MANY TIME in Jenkins Author: Tathagata Das Author: Liwei Lin Closes #15519 from tdas/metrics-flaky-test-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d878cf2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d878cf2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d878cf2 Branch: refs/heads/master Commit: 7d878cf2da04800bc4147b05610170865b148c64 Parents: 1c5a7d7 Author: Liwei Lin Authored: Tue Oct 18 00:49:57 2016 -0700 Committer: Tathagata Das Committed: Tue Oct 18 00:49:57 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/util/ManualClock.scala | 18 ++-------- .../spark/sql/streaming/StreamSuite.scala | 4 +-- .../apache/spark/sql/streaming/StreamTest.scala | 38 ++++++++++++++++---- .../streaming/StreamingQueryListenerSuite.scala | 8 ++--- 4 files changed, 41 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/core/src/main/scala/org/apache/spark/util/ManualClock.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 91a9587..e7a65d7 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -26,8 +26,6 @@ package org.apache.spark.util */ private[spark] class ManualClock(private var time: Long) extends Clock { - private var _isWaiting = false - /** * @return `ManualClock` with initial time 0 */ @@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @return current time reported by the clock when waiting finishes */ def waitTillTime(targetTime: Long): Long = synchronized { - _isWaiting = true - try { - while (time < targetTime) { - wait(10) - } - getTimeMillis() - } finally { - _isWaiting = false + while (time < targetTime) { + wait(10) } + getTimeMillis() } - - /** - * Returns whether there is any thread being blocked in `waitTillTime`. - */ - def isWaiting: Boolean = synchronized { _isWaiting } } http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index cdbad90..6bdf479 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -161,7 +161,7 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[Int] testStream(inputData.toDS())( - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new StreamManualClock), /* -- batch 0 ----------------------- */ // Add some data in batch 0 @@ -199,7 +199,7 @@ class StreamSuite extends StreamTest { /* Stop then restart the Stream */ StopStream, - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), /* -- batch 1 rerun ----------------- */ // this batch 1 would re-run because the latest batch id logged in offset log is 1 http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 3b9d378..254f823 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit) extends StreamAction + class StreamManualClock(time: Long = 0L) extends ManualClock(time) { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { + try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) + } finally { + waitStartTime = None + } + } + + def isStreamWaitingAt(time: Long): Boolean = synchronized { waitStartTime.contains(time) } + } + /** * Executes the specified actions on the given streaming DataFrame and provides helpful @@ -307,7 +322,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val testThread = Thread.currentThread() val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath val statusCollector = new QueryStatusCollector - + var manualClockExpectedTime = -1L try { spark.streams.addListener(statusCollector) startedTest.foreach { action => @@ -315,6 +330,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { action match { case StartStream(trigger, triggerClock) => verify(currentStream == null, "stream already running") + verify(triggerClock.isInstanceOf[SystemClock] + || triggerClock.isInstanceOf[StreamManualClock], + "Use either SystemClock or StreamManualClock to start the stream") + if (triggerClock.isInstanceOf[StreamManualClock]) { + manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis() + } lastStream = currentStream currentStream = spark @@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case AdvanceManualClock(timeToAdd) => verify(currentStream != null, "can not advance manual clock when a stream is not running") - verify(currentStream.triggerClock.isInstanceOf[ManualClock], + verify(currentStream.triggerClock.isInstanceOf[StreamManualClock], s"can not advance clock of type ${currentStream.triggerClock.getClass}") - val clock = currentStream.triggerClock.asInstanceOf[ManualClock] + val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] + assert(manualClockExpectedTime >= 0) // Make sure we don't advance ManualClock too early. See SPARK-16002. - eventually("ManualClock has not yet entered the waiting state") { - assert(clock.isWaiting) + eventually("StreamManualClock has not yet entered the waiting state") { + assert(clock.isStreamWaitingAt(manualClockExpectedTime)) } - currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + clock.advance(timeToAdd) + manualClockExpectedTime += timeToAdd + verify(clock.getTimeMillis() === manualClockExpectedTime, + s"Unexpected clock time after updating: " + + s"expecting $manualClockExpectedTime, current ${clock.getTimeMillis()}") case StopStream => verify(currentStream != null, "can not stop a stream that is not running") http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9e0eefb..623f66a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - ignore("single listener, check trigger statuses") { + test("single listener, check trigger statuses") { import StreamingQueryListenerSuite._ - clock = new ManualClock() + clock = new StreamManualClock /** Custom MemoryStream that waits for manual clock to reach a time */ val inputData = new MemoryStream[Int](0, sqlContext) { @@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.get("triggerId") == "0") + assert(status.triggerDetails.containsKey("triggerId")) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") + assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId")) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org