spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high
Date Mon, 07 Dec 2015 08:22:00 GMT
Repository: spark
Updated Branches:
  refs/heads/master 80a824d36 -> 6fd9e70e3


[SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is
high

We need to make sure that the last entry is indeed the last entry in the queue.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #10110 from brkyvz/batch-wal-test-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fd9e70e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fd9e70e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fd9e70e

Branch: refs/heads/master
Commit: 6fd9e70e3ed43836a0685507fff9949f921234f4
Parents: 80a824d
Author: Burak Yavuz <brkyvz@gmail.com>
Authored: Mon Dec 7 00:21:55 2015 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Mon Dec 7 00:21:55 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/util/BatchedWriteAheadLog.scala   |  6 ++++--
 .../spark/streaming/util/WriteAheadLogSuite.scala     | 14 ++++++++++----
 2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6fd9e70e/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
index 7158abc..b2cd524 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
@@ -166,10 +166,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog,
conf: Sp
       var segment: WriteAheadLogRecordHandle = null
       if (buffer.length > 0) {
         logDebug(s"Batched ${buffer.length} records for Write Ahead Log write")
+        // threads may not be able to add items in order by time
+        val sortedByTime = buffer.sortBy(_.time)
         // We take the latest record for the timestamp. Please refer to the class Javadoc
for
         // detailed explanation
-        val time = buffer.last.time
-        segment = wrappedLog.write(aggregate(buffer), time)
+        val time = sortedByTime.last.time
+        segment = wrappedLog.write(aggregate(sortedByTime), time)
       }
       buffer.foreach(_.promise.success(segment))
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/6fd9e70e/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index eaa88ea..ef1e89d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     p
   }
 
-  test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last
entry") {
+  test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries")
{
     val blockingWal = new BlockingWriteAheadLog(wal, walHandle)
     val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf)
 
@@ -500,8 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     // rest of the records will be batched while it takes time for 3 to get written
     writeAsync(batchedWal, event2, 5L)
     writeAsync(batchedWal, event3, 8L)
-    writeAsync(batchedWal, event4, 12L)
-    writeAsync(batchedWal, event5, 10L)
+    // we would like event 5 to be written before event 4 in order to test that they get
+    // sorted before being aggregated
+    writeAsync(batchedWal, event5, 12L)
+    eventually(timeout(1 second)) {
+      assert(blockingWal.isBlocked)
+      assert(batchedWal.invokePrivate(queueLength()) === 3)
+    }
+    writeAsync(batchedWal, event4, 10L)
     eventually(timeout(1 second)) {
       assert(walBatchingThreadPool.getActiveCount === 5)
       assert(batchedWal.invokePrivate(queueLength()) === 4)
@@ -517,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
       // the file name should be the timestamp of the last record, as events should be naturally
       // in order of timestamp, and we need the last element.
       val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
-      verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
+      verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L))
       val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
       assert(records.toSet === queuedEvents)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message