spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...
Date Fri, 31 Oct 2014 22:16:58 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696840
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming
    +
    +import java.io.File
    +
    +import scala.Some
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.{implicitConversions, postfixOps}
    +import scala.util.Random
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.scheduler._
    +import org.apache.spark.streaming.util.WriteAheadLogSuite._
    +import org.apache.spark.streaming.util.{WriteAheadLogReader, Clock, ManualClock, SystemClock}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
    +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
    +import org.apache.spark.storage.StreamBlockId
    +import scala.Some
    +import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
    +
    +class ReceivedBlockTrackerSuite
    +  extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +
    +  val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
    +  conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs",
"1")
    +
    +  val hadoopConf = new Configuration()
    +  val akkaTimeout = 10 seconds
    +  val streamId = 1
    +
    +  var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
    +  var checkpointDirectory: File = null
    +
    +  before {
    +    checkpointDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    allReceivedBlockTrackers.foreach { _.stop() }
    +    if (checkpointDirectory != null && checkpointDirectory.exists()) {
    +      FileUtils.deleteDirectory(checkpointDirectory)
    +      checkpointDirectory = null
    +    }
    +  }
    +
    +  test("block addition, and block to batch allocation") {
    +    val receivedBlockTracker = createTracker(enableCheckpoint = false)
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
    +
    +    val blockInfos = generateBlockInfos()
    +    blockInfos.map(receivedBlockTracker.addBlock)
    +
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
    +    receivedBlockTracker.allocateBlocksToBatch(1)
    +    receivedBlockTracker.getBlocksOfBatch(1, streamId) shouldEqual blockInfos
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) should have size 0
    +    receivedBlockTracker.allocateBlocksToBatch(2)
    +    receivedBlockTracker.getBlocksOfBatch(2, streamId) should have size 0
    +  }
    +
    +  test("block addition, block to batch allocation and cleanup with write ahead log")
{
    +    val manualClock = new ManualClock
    +    conf.getInt(
    +      "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1)
should be (1)
    +
    +    // Set the time increment level to twice the rotation interval so that every increment
creates
    +    // a new log file
    +    val timeIncrementMillis = 2000L
    +    def incrementTime() {
    +      manualClock.addToTime(timeIncrementMillis)
    +    }
    +
    +    // Generate and add blocks to the given tracker
    +    def addBlockInfos(tracker: ReceivedBlockTracker): Seq[ReceivedBlockInfo] = {
    +      val blockInfos = generateBlockInfos()
    +      blockInfos.map(tracker.addBlock)
    +      blockInfos
    +    }
    +
    +    // Print the data present in the log ahead files in the log directory
    +    def printLogFiles(message: String) {
    +      val fileContents = getWriteAheadLogFiles().map { file =>
    +        (s"\n>>>>> $file: <<<<<\n${getWrittenLogData(file).mkString("\n")}")
    +      }.mkString("\n")
    +      logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n")
    +    }
    +
    +    // Start tracker and add blocks
    +    val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    val blockInfos1 = addBlockInfos(tracker1)
    +    tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
    +
    +    // Verify whether write ahead log has correct contents
    +    val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
    +    getWrittenLogData() shouldEqual expectedWrittenData1
    +    getWriteAheadLogFiles() should have size 1
    +
    +    // Restart tracker and verify recovered list of unallocated blocks
    +    incrementTime()
    +    val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
    +
    +    // Allocate blocks to batch and verify whether the unallocated blocks got allocated
    +    val batchTime1 = manualClock.currentTime
    +    tracker2.allocateBlocksToBatch(batchTime1)
    +    tracker2.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
    +
    +    // Add more blocks and allocate to another batch
    +    incrementTime()
    +    val batchTime2 = manualClock.currentTime
    +    val blockInfos2 = addBlockInfos(tracker2)
    +    tracker2.allocateBlocksToBatch(batchTime2)
    +    tracker2.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
    +
    +    // Verify whether log has correct contents
    +    val expectedWrittenData2 = expectedWrittenData1 ++
    +      Seq(createBatchAllocation(batchTime1, blockInfos1)) ++
    +      blockInfos2.map(BlockAdditionEvent) ++
    +      Seq(createBatchAllocation(batchTime2, blockInfos2))
    +    getWrittenLogData() shouldEqual expectedWrittenData2
    +
    +    // Restart tracker and verify recovered state
    +    incrementTime()
    +    val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
    +    tracker3.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
    +    tracker3.getUnallocatedBlocks(streamId) shouldBe empty
    +
    +    // Cleanup first batch but not second batch
    +    val oldestLogFile = getWriteAheadLogFiles().head
    +    incrementTime()
    +    tracker3.cleanupOldBatches(batchTime2)
    +
    +    // Verify that the batch allocations have been cleaned, and the act has been written
to log
    +    tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual Seq.empty
    +    getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
    +
    +    // Verify that at least one log file gets deleted
    +    eventually(timeout(10 seconds), interval(10 millisecond )) {
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message