spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-11141][STREAMING] Batch ReceivedBlockTr...
Date Thu, 05 Nov 2015 03:32:41 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9143#discussion_r43973255
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
---
    @@ -190,135 +362,197 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter
{
         }
         reader.close()
       }
    +}
     
    -  test("FileBasedWriteAheadLog - write rotating logs") {
    -    // Write data with rotation using WriteAheadLog class
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    +abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag: String)
    +  extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite = true, testTag)
{
     
    -    // Read data manually to verify the written data
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    -    assert(writtenData === dataToWrite)
    -  }
    -
    -  test("FileBasedWriteAheadLog - close after write flag") {
    +  import WriteAheadLogSuite._
    +  test(testPrefix + "close after write flag") {
         // Write data with rotation using WriteAheadLog class
         val numFiles = 3
         val dataToWrite = Seq.tabulate(numFiles)(_.toString)
         // total advance time is less than 1000, therefore log shouldn't be rolled, but manually
closed
         writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, clockAdvanceTime
= 100,
    -      closeFileAfterWrite = true)
    +      closeFileAfterWrite = true, allowBatching = allowBatching)
     
         // Read data manually to verify the written data
         val logFiles = getLogFilesInDirectory(testDir)
         assert(logFiles.size === numFiles)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    +    val writtenData: Seq[String] = readAndDeserializeDataManually(logFiles, allowBatching)
         assert(writtenData === dataToWrite)
       }
    +}
     
    -  test("FileBasedWriteAheadLog - read rotating logs") {
    -    // Write data manually for testing reading through WriteAheadLog
    -    val writtenData = (1 to 10).map { i =>
    -      val data = generateRandomData()
    -      val file = testDir + s"/log-$i-$i"
    -      writeDataManually(data, file)
    -      data
    -    }.flatten
    +class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
    +  extends CloseFileAfterWriteTests(allowBatching = false, "FileBasedWriteAheadLog")
     
    -    val logDirectoryPath = new Path(testDir)
    -    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
    -    assert(fileSystem.exists(logDirectoryPath) === true)
    +/** Tests for the aggregation, deaggregation related methods in the BatchedWriteAheadLog
object */
    +class BatchedWriteAheadLogUtilsSuite extends SparkFunSuite {
    +  import BatchedWriteAheadLog._
     
    -    // Read data using manager and verify
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === writtenData)
    -  }
    +  test("serializing and deserializing batched records") {
    +    val events = Seq(
    +      BlockAdditionEvent(ReceivedBlockInfo(0, None, None, null)),
    +      BatchAllocationEvent(null, null),
    +      BatchCleanupEvent(Nil)
    +    )
     
    -  test("FileBasedWriteAheadLog - recover past logs when creating new manager") {
    -    // Write data with manager, recover with new manager and verify
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(dataToWrite === readData)
    -  }
    +    val buffers = events.map(e => RecordBuffer(ByteBuffer.wrap(Utils.serialize(e)),
0L, null))
    +    val batched = BatchedWriteAheadLog.aggregate(buffers)
    +    val deaggregate = BatchedWriteAheadLog.deaggregate(batched).map(buffer =>
    +      Utils.deserialize[ReceivedBlockTrackerLogEvent](buffer.array()))
     
    -  test("FileBasedWriteAheadLog - clean old logs") {
    -    logCleanUpTest(waitForCompletion = false)
    +    assert(deaggregate.toSeq === events)
       }
    +}
     
    -  test("FileBasedWriteAheadLog - clean old logs synchronously") {
    -    logCleanUpTest(waitForCompletion = true)
    -  }
    +class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
    +    allowBatching = true,
    +    closeFileAfterWrite = false,
    +    "BatchedWriteAheadLog") {
     
    -  private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
    -    // Write data with manager, recover with new manager and verify
    -    val manualClock = new ManualClock
    -    val dataToWrite = generateRandomData()
    -    writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, manualClock, closeLog
= false)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    +  import BatchedWriteAheadLog._
     
    -    writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
    +  // Class that will help us test batching.
    +  private class MockBatchedWriteAheadLog(
    +      parent: WriteAheadLog,
    +      writerThread: Thread = mmock[Thread]) extends BatchedWriteAheadLog(parent) {
     
    -    if (waitForCompletion) {
    -      assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -    } else {
    -      eventually(timeout(1 second), interval(10 milliseconds)) {
    -        assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    +    override def startBatchedWriterThread(): Thread = writerThread
    +
    +    override def flushRecords(): Unit = {
    +      buffer.append(walWriteQueue.take())
    +      walWriteQueue.drainTo(buffer.asJava)
    +    }
    +
    +    def mockWrite(successful: Boolean): Seq[RecordBuffer] = {
    +      val records = buffer.toSeq
    +      buffer.foreach { case RecordBuffer(byteBuffer, time, promise) =>
    +        if (successful) promise.success(mmock[WriteAheadLogRecordHandle]) else promise.success(null)
           }
    +      buffer.clear()
    +      records
         }
    +
    +    def getQueueLength(): Int = walWriteQueue.size()
       }
     
    -  test("FileBasedWriteAheadLog - handling file errors while reading rotating logs") {
    -    // Generate a set of log files
    -    val manualClock = new ManualClock
    -    val dataToWrite1 = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
    -    val logFiles1 = getLogFilesInDirectory(testDir)
    -    assert(logFiles1.size > 1)
     
    +  private def waitUntilTrue(f: () => Int, value: Int): Boolean = {
    +    val timeOut = 2000
    +    val start = System.currentTimeMillis()
    +    var result = false
    +    while (!result && (System.currentTimeMillis() - start) < timeOut) {
    +      Thread.sleep(50)
    +      result = f() == value
    +    }
    +    result
    +  }
     
    -    // Recover old files and generate a second set of log files
    -    val dataToWrite2 = generateRandomData()
    -    manualClock.advance(100000)
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
    -    val logFiles2 = getLogFilesInDirectory(testDir)
    -    assert(logFiles2.size > logFiles1.size)
    +  import WriteAheadLogSuite._
     
    -    // Read the files and verify that all the written data can be read
    -    val readData1 = readDataUsingWriteAheadLog(testDir)
    -    assert(readData1 === (dataToWrite1 ++ dataToWrite2))
    +  test("BatchedWriteAheadLog - records get added to a queue") {
    +    val numSuccess = new AtomicInteger()
    +    val numFail = new AtomicInteger()
    +    val wal = new MockBatchedWriteAheadLog(mmock[FileBasedWriteAheadLog])
     
    -    // Corrupt the first set of files so that they are basically unreadable
    -    logFiles1.foreach { f =>
    -      val raf = new FileOutputStream(f, true).getChannel()
    -      raf.truncate(1)
    -      raf.close()
    +    def getNumSuccess(): Int = numSuccess.get()
    +    def getNumFail(): Int = numFail.get()
    +
    +    val walBatchingThreadPool = ExecutionContext.fromExecutorService(
    +      ThreadUtils.newDaemonCachedThreadPool("wal-batching-thead-pool"))
    +
    +    def eventFuture(event: String, time: Long): Unit = {
    +      val f = Future(wal.write(event, time))(walBatchingThreadPool)
    +      f.onSuccess { case v =>
    +        if (v != null) numSuccess.incrementAndGet() else numFail.incrementAndGet()
    +      }(walBatchingThreadPool)
         }
     
    -    // Verify that the corrupted files do not prevent reading of the second set of data
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === dataToWrite2)
    +    assert(wal.getQueueLength === 0)
    --- End diff --
    
    Would be good to add inline comments to give some idea on whats being done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message