spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + minor recovery tweaks
Date Fri, 13 Nov 2015 02:03:27 GMT
Repository: spark
Updated Branches:
  refs/heads/master 0f1d00a90 -> 7786f9cc0


[SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + minor recovery tweaks

The support for closing WriteAheadLog files after writes was just merged in. Closing every
file after a write is a very expensive operation as it creates many small files on S3. It's
not necessary to enable it on HDFS anyway.

However, when you have many small files on S3, recovery takes very long. In addition, files
start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes
can also be parallelized.

This PR adds support for the two parallelization steps mentioned above, in addition to a couple
more failures I encountered during recovery.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #9373 from brkyvz/par-recovery.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7786f9cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7786f9cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7786f9cc

Branch: refs/heads/master
Commit: 7786f9cc0790d27854a1e184f66a9b4df4d040a2
Parents: 0f1d00a
Author: Burak Yavuz <brkyvz@gmail.com>
Authored: Thu Nov 12 18:03:23 2015 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Nov 12 18:03:23 2015 -0800

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      |  6 +-
 .../streaming/util/FileBasedWriteAheadLog.scala | 78 +++++++++++------
 .../FileBasedWriteAheadLogRandomReader.scala    |  2 +-
 .../util/FileBasedWriteAheadLogReader.scala     | 17 +++-
 .../apache/spark/streaming/util/HdfsUtils.scala | 24 +++++-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 91 +++++++++++++++++++-
 .../streaming/util/WriteAheadLogSuite.scala     | 87 +++++++++++++++++--
 7 files changed, 268 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 2480b4e..1ed6fb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     if (eventLoop == null) return // scheduler has already been stopped
     logDebug("Stopping JobScheduler")
 
-    // First, stop receiving
-    receiverTracker.stop(processAllReceivedData)
+    if (receiverTracker != null) {
+      // First, stop receiving
+      receiverTracker.stop(processAllReceivedData)
+    }
 
     // Second, stop generating jobs. If it has to process all received data,
     // then this will wait for all the processing through JobScheduler to be over.

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index bc3f248..72705f1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,10 +17,12 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
+import java.util.concurrent.ThreadPoolExecutor
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ThreadPoolTaskSupport
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.language.postfixOps
 
@@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
   private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
 
   private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
-  implicit private val executionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+  private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20)
+  private val executionContext = ExecutionContext.fromExecutorService(threadpool)
   override protected val logName = s"WriteAheadLogManager $callerNameTag"
 
   private var currentLogPath: Option[String] = None
@@ -124,13 +126,19 @@ private[streaming] class FileBasedWriteAheadLog(
    */
   def readAll(): JIterator[ByteBuffer] = synchronized {
     val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
-    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
-
-    logFilesToRead.iterator.map { file =>
+    logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+    def readFile(file: String): Iterator[ByteBuffer] = {
       logDebug(s"Creating log reader with $file")
       val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
       CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
-    }.flatten.asJava
+    }
+    if (!closeFileAfterWrite) {
+      logFilesToRead.iterator.map(readFile).flatten.asJava
+    } else {
+      // For performance gains, it makes sense to parallelize the recovery if
+      // closeFileAfterWrite = true
+      seqToParIterator(threadpool, logFilesToRead, readFile).asJava
+    }
   }
 
   /**
@@ -146,30 +154,33 @@ private[streaming] class FileBasedWriteAheadLog(
    * asynchronously.
    */
   def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
-    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+    val oldLogFiles = synchronized {
+      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
+      pastLogs --= expiredLogs
+      expiredLogs
+    }
     logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
       s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
 
-    def deleteFiles() {
-      oldLogFiles.foreach { logInfo =>
-        try {
-          val path = new Path(logInfo.path)
-          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
-          fs.delete(path, true)
-          synchronized { pastLogs -= logInfo }
-          logDebug(s"Cleared log file $logInfo")
-        } catch {
-          case ex: Exception =>
-            logWarning(s"Error clearing write ahead log file $logInfo", ex)
-        }
+    def deleteFile(walInfo: LogInfo): Unit = {
+      try {
+        val path = new Path(walInfo.path)
+        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+        fs.delete(path, true)
+        logDebug(s"Cleared log file $walInfo")
+      } catch {
+        case ex: Exception =>
+          logWarning(s"Error clearing write ahead log file $walInfo", ex)
       }
       logInfo(s"Cleared log files in $logDirectory older than $threshTime")
     }
-    if (!executionContext.isShutdown) {
-      val f = Future { deleteFiles() }
-      if (waitForCompletion) {
-        import scala.concurrent.duration._
-        Await.ready(f, 1 second)
+    oldLogFiles.foreach { logInfo =>
+      if (!executionContext.isShutdown) {
+        val f = Future { deleteFile(logInfo) }(executionContext)
+        if (waitForCompletion) {
+          import scala.concurrent.duration._
+          Await.ready(f, 1 second)
+        }
       }
     }
   }
@@ -251,4 +262,23 @@ private[streaming] object FileBasedWriteAheadLog {
       }
     }.sortBy { _.startTime }
   }
+
+  /**
+   * This creates an iterator from a parallel collection, by keeping at most `n` objects
in memory
+   * at any given time, where `n` is the size of the thread pool. This is crucial for use
cases
+   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want
to
+   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
+   */
+  def seqToParIterator[I, O](
+      tpool: ThreadPoolExecutor,
+      source: Seq[I],
+      handler: I => Iterator[O]): Iterator[O] = {
+    val taskSupport = new ThreadPoolTaskSupport(tpool)
+    val groupSize = tpool.getMaximumPoolSize.max(8)
+    source.grouped(groupSize).flatMap { group =>
+      val parallelCollection = group.par
+      parallelCollection.tasksupport = taskSupport
+      parallelCollection.map(handler)
+    }.flatten
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
index f716822..56d4977 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -30,7 +30,7 @@ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String,
conf:
   extends Closeable {
 
   private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
+  private var closed = (instream == null) // the file may be deleted as we're opening the
stream
 
   def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
     assertOpen()

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
index c3bb59f..a375c07 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
-import java.io.{Closeable, EOFException}
+import java.io.{IOException, Closeable, EOFException}
 import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf:
Config
   extends Iterator[ByteBuffer] with Closeable with Logging {
 
   private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
+  private var closed = (instream == null) // the file may be deleted as we're opening the
stream
   private var nextItem: Option[ByteBuffer] = None
 
   override def hasNext: Boolean = synchronized {
@@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf:
Config
           logDebug("Error reading next item, EOF reached", e)
           close()
           false
+        case e: IOException =>
+          logWarning("Error while trying to read data. If the file was deleted, " +
+            "this should be okay.", e)
+          close()
+          if (HdfsUtils.checkFileExists(path, conf)) {
+            // If file exists, this could be a legitimate error
+            throw e
+          } else {
+            // File was deleted. This can occur when the daemon cleanup thread takes time
to
+            // delete the file during recovery.
+            false
+          }
+
         case e: Exception =>
           logWarning("Error while trying to read data from HDFS.", e)
           close()

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index f60688f..13a765d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.streaming.util
 
+import java.io.IOException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
@@ -42,8 +44,19 @@ private[streaming] object HdfsUtils {
   def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
     val dfsPath = new Path(path)
     val dfs = getFileSystemForPath(dfsPath, conf)
-    val instream = dfs.open(dfsPath)
-    instream
+    if (dfs.isFile(dfsPath)) {
+      try {
+        dfs.open(dfsPath)
+      } catch {
+        case e: IOException =>
+          // If we are really unlucky, the file may be deleted as we're opening the stream.
+          // This can happen as clean up is performed by daemon threads that may be left
over from
+          // previous runs.
+          if (!dfs.isFile(dfsPath)) null else throw e
+      }
+    } else {
+      null
+    }
   }
 
   def checkState(state: Boolean, errorMsg: => String) {
@@ -71,4 +84,11 @@ private[streaming] object HdfsUtils {
       case _ => fs
     }
   }
+
+  /** Check if the file exists at the given path. */
+  def checkFileExists(path: String, conf: Configuration): Boolean = {
+    val hdpPath = new Path(path)
+    val fs = getFileSystemForPath(hdpPath, conf)
+    fs.isFile(hdpPath)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index f793a12..7db17ab 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.File
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
+import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
@@ -207,6 +208,75 @@ class ReceivedBlockTrackerSuite
     tracker1.isWriteAheadLogEnabled should be (false)
   }
 
+  test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
+    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
+
+    val addBlocks = generateBlockInfos()
+    val batch1 = addBlocks.slice(0, 1)
+    val batch2 = addBlocks.slice(1, 3)
+    val batch3 = addBlocks.slice(3, addBlocks.length)
+
+    assert(getWriteAheadLogFiles().length === 0)
+
+    // list of timestamps for files
+    val t = Seq.tabulate(5)(i => i * 1000)
+
+    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+    assert(getWriteAheadLogFiles().length === 1)
+
+    // The goal is to create several log files which should have been cleaned up.
+    // If we face any issue during recovery, because these old files exist, then we need
to make
+    // deletion more robust rather than a parallelized operation where we fire and forget
+    val batch1Allocation = createBatchAllocation(t(1), batch1)
+    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+
+    writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))
+
+    val batch2Allocation = createBatchAllocation(t(3), batch2)
+    writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)
+
+    writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))
+
+    // We should have 5 different log files as we called `writeEventsManually` with 5 different
+    // timestamps
+    assert(getWriteAheadLogFiles().length === 5)
+
+    // Create the tracker to recover from the log files. We're going to ask the tracker to
clean
+    // things up, and then we're going to rewrite that data, and recover using a different
tracker.
+    // They should have identical data no matter what
+    val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+
+    def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit
= {
+      subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
+        base.getBlocksOfBatchAndStream(t(3), streamId))
+      subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
+        base.getBlocksOfBatchAndStream(t(1), streamId))
+      subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
+    }
+
+    // ask the tracker to clean up some old files
+    tracker.cleanupOldBatches(t(3), waitForCompletion = true)
+    assert(getWriteAheadLogFiles().length === 3)
+
+    val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+    compareTrackers(tracker, tracker2)
+
+    // rewrite first file
+    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+    assert(getWriteAheadLogFiles().length === 4)
+    // make sure trackers are consistent
+    val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+    compareTrackers(tracker, tracker3)
+
+    // rewrite second file
+    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+    assert(getWriteAheadLogFiles().length === 5)
+    // make sure trackers are consistent
+    val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+    compareTrackers(tracker, tracker4)
+  }
+
   /**
    * Create tracker object with the optional provided clock. Use fake clock if you
    * want to control time by manually incrementing it to test log clean.
@@ -228,11 +298,30 @@ class ReceivedBlockTrackerSuite
       BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
   }
 
+  /**
+   * Write received block tracker events to a file manually.
+   */
+  def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit
= {
+    val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
+    events.foreach { event =>
+      val bytes = Utils.serialize(event)
+      writer.writeInt(bytes.size)
+      writer.write(bytes)
+    }
+    writer.close()
+  }
+
   /** Get all the data written in the given write ahead log file. */
   def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
     getWrittenLogData(Seq(logFile))
   }
 
+  /** Get the log file name for the given log start time. */
+  def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
+    checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
+      File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
+  }
+
   /**
    * Get all the data written in the given write ahead log files. By default, it will read
all
    * files in the test log directory.

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 9e13f25..4273fd7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.streaming.util
 import java.io._
 import java.nio.ByteBuffer
 import java.util.{Iterator => JIterator}
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -32,15 +33,13 @@ import org.apache.hadoop.fs.Path
 import org.mockito.Matchers.{eq => meq}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.scalatest.concurrent.Eventually
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach, BeforeAndAfter}
 import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{ThreadUtils, ManualClock, Utils}
+import org.apache.spark.util.{CompletionIterator, ThreadUtils, ManualClock, Utils}
 import org.apache.spark.{SparkConf, SparkFunSuite}
 
 /** Common tests for WriteAheadLogs that we would like to test with different configurations.
*/
@@ -198,6 +197,64 @@ class FileBasedWriteAheadLogSuite
 
   import WriteAheadLogSuite._
 
+  test("FileBasedWriteAheadLog - seqToParIterator") {
+    /*
+     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number
of
+     files. This causes recovery to take a very long time. In order to make it quicker, we
+     parallelized the reading of these files. This test makes sure that we limit the number
of
+     open files to the size of the number of threads in our thread pool rather than the size
of
+     the list of files.
+     */
+    val numThreads = 8
+    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
+    class GetMaxCounter {
+      private val value = new AtomicInteger()
+      @volatile private var max: Int = 0
+      def increment(): Unit = synchronized {
+        val atInstant = value.incrementAndGet()
+        if (atInstant > max) max = atInstant
+      }
+      def decrement(): Unit = synchronized { value.decrementAndGet() }
+      def get(): Int = synchronized { value.get() }
+      def getMax(): Int = synchronized { max }
+    }
+    try {
+      // If Jenkins is slow, we may not have a chance to run many threads simultaneously.
Having
+      // a latch will make sure that all the threads can be launched altogether.
+      val latch = new CountDownLatch(1)
+      val testSeq = 1 to 1000
+      val counter = new GetMaxCounter()
+      def handle(value: Int): Iterator[Int] = {
+        new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
+          counter.increment()
+          // block so that other threads also launch
+          latch.await(10, TimeUnit.SECONDS)
+          override def completion() { counter.decrement() }
+        }
+      }
+      @volatile var collected: Seq[Int] = Nil
+      val t = new Thread() {
+        override def run() {
+          // run the calculation on a separate thread so that we can release the latch
+          val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq,
handle)
+          collected = iterator.toSeq
+        }
+      }
+      t.start()
+      eventually(Eventually.timeout(10.seconds)) {
+        // make sure we are doing a parallel computation!
+        assert(counter.getMax() > 1)
+      }
+      latch.countDown()
+      t.join(10000)
+      assert(collected === testSeq)
+      // make sure we didn't open too many Iterators
+      assert(counter.getMax() <= numThreads)
+    } finally {
+      tpool.shutdownNow()
+    }
+  }
+
   test("FileBasedWriteAheadLogWriter - writing data") {
     val dataToWrite = generateRandomData()
     val segments = writeDataUsingWriter(testFile, dataToWrite)
@@ -259,6 +316,26 @@ class FileBasedWriteAheadLogSuite
     assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
   }
 
+  test("FileBasedWriteAheadLogReader - handles errors when file doesn't exist") {
+    // Write data manually for testing the sequential reader
+    val dataToWrite = generateRandomData()
+    writeDataUsingWriter(testFile, dataToWrite)
+    val tFile = new File(testFile)
+    assert(tFile.exists())
+    // Verify the data can be read and is same as the one correctly written
+    assert(readDataUsingReader(testFile) === dataToWrite)
+
+    tFile.delete()
+    assert(!tFile.exists())
+
+    val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
+    assert(!reader.hasNext)
+    reader.close()
+
+    // Verify that no exception is thrown if file doesn't exist
+    assert(readDataUsingReader(testFile) === Nil)
+  }
+
   test("FileBasedWriteAheadLogRandomReader - reading data using random reader") {
     // Write data manually for testing the random reader
     val writtenData = generateRandomData()
@@ -581,7 +658,7 @@ object WriteAheadLogSuite {
       closeFileAfterWrite: Boolean,
       allowBatching: Boolean): Seq[String] = {
     val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching)
-    val data = wal.readAll().asScala.map(byteBufferToString).toSeq
+    val data = wal.readAll().asScala.map(byteBufferToString).toArray
     wal.close()
     data
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message