spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JoshRosen <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...
Date Thu, 25 Dec 2014 10:29:30 GMT
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22271345
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified()
on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file
listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since
rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_,
_, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_,
_, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    This Thread.sleep call was to ensure that we're actually inside a `fileStream.map` task
when the context shuts down.  This might not be necessary given the `waitForTotalBatchesStarted`
call on the previous line.  AFAIK, the reason why I had this here was that `waitForTotalBatchesStarted`
might fire after streaming starts the batch but before the underlying Spark jobs start.  If
we do need to block until Spark itself actually begins the processing, then that could be
a little trickier.  I suppose I could just use a SparkListener for that.
    
    I toyed around with the idea of using a semaphore, but that's tricky because you want
to ensure that the task and the caller have the same Semaphore object in the JVM.  Since the
task is serialized, you effectively have to have a global object with a hashmap that holds
the semaphore, which is a bit messy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message