kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5829; Only delete producer snapshots before the recovery point
Date Fri, 06 Oct 2017 23:48:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2427a4476 -> 91517e8fb


KAFKA-5829; Only delete producer snapshots before the recovery point

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #4023 from ijuma/kafka-5829-avoid-reading-older-segments-on-hard-shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91517e8f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91517e8f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91517e8f

Branch: refs/heads/trunk
Commit: 91517e8fbd7767ba6d7f43b517f5a26b6f870585
Parents: 2427a44
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Oct 6 16:45:45 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Oct 6 16:45:45 2017 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherThread.scala  |   3 +-
 core/src/main/scala/kafka/log/Log.scala         | 202 +++++++++++--------
 core/src/main/scala/kafka/log/LogCleaner.scala  |   2 +-
 .../scala/kafka/log/LogCleanerManager.scala     |   9 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  93 +++++----
 .../scala/kafka/log/ProducerStateManager.scala  |  59 +++---
 .../kafka/server/AbstractFetcherThread.scala    |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   1 +
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../checkpoints/OffsetCheckpointFile.scala      |   6 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |   4 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  40 ++--
 .../src/test/scala/unit/kafka/log/LogTest.scala | 200 +++++++++++++-----
 .../kafka/log/ProducerStateManagerTest.scala    |   2 +-
 .../server/AbstractFetcherThreadTest.scala      |   1 +
 .../server/HighwatermarkPersistenceTest.scala   |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  10 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |   3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   6 +-
 20 files changed, 401 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index d5e084e..705dc24 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -20,7 +20,8 @@ package kafka.consumer
 import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request}
 import kafka.cluster.BrokerEndPoint
 import kafka.message.ByteBufferMessageSet
-import kafka.server.{AbstractFetcherThread, PartitionFetchState, ResultWithPartitions}
+import kafka.server.{AbstractFetcherThread, PartitionFetchState}
+import AbstractFetcherThread.ResultWithPartitions
 import kafka.common.{ErrorMapping, TopicAndPartition}
 
 import scala.collection.Map

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d397ca6..a6f76ab 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -192,14 +192,14 @@ class Log(@volatile var dir: File,
   locally {
     val startMs = time.milliseconds
 
-    loadSegments()
+    val nextOffset = loadSegments()
 
     /* Calculate the offset of the next message */
-    nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset, activeSegment.size)
+    nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
 
     leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
 
-    logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+    logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
     leaderEpochCache.clearAndFlushEarliest(logStartOffset)
@@ -256,10 +256,10 @@ class Log(@volatile var dir: File,
     var swapFiles = Set[File]()
 
     for (file <- dir.listFiles if file.isFile) {
-      if(!file.canRead)
+      if (!file.canRead)
         throw new IOException("Could not read file " + file)
       val filename = file.getName
-      if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
+      if (filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
         // if the file ends in .deleted or .cleaned, delete it
         Files.deleteIfExists(file.toPath)
       } else if(filename.endsWith(SwapFileSuffix)) {
@@ -271,7 +271,7 @@ class Log(@volatile var dir: File,
           Files.deleteIfExists(file.toPath)
         } else if (isLogFile(baseFile)) {
           // delete the index files
-          val offset = offsetFromFilename(baseFile.getName)
+          val offset = offsetFromFile(baseFile)
           Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath)
           Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath)
           Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath)
@@ -287,10 +287,9 @@ class Log(@volatile var dir: File,
     // load segments in ascending order because transactional data from one segment may depend on the
     // segments that come before it
     for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
-      val filename = file.getName
       if (isIndexFile(file)) {
         // if it is an index file, make sure it has a corresponding .log file
-        val offset = offsetFromFilename(filename)
+        val offset = offsetFromFile(file)
         val logFile = Log.logFile(dir, offset)
         if (!logFile.exists) {
           warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
@@ -298,7 +297,7 @@ class Log(@volatile var dir: File,
         }
       } else if (isLogFile(file)) {
         // if it's a log file, load the corresponding log segment
-        val startOffset = offsetFromFilename(filename)
+        val startOffset = offsetFromFile(file)
         val indexFile = Log.offsetIndexFile(dir, startOffset)
         val timeIndexFile = Log.timeIndexFile(dir, startOffset)
         val txnIndexFile = Log.transactionIndexFile(dir, startOffset)
@@ -334,7 +333,7 @@ class Log(@volatile var dir: File,
           error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           recoverSegment(segment)
         }
-        segments.put(startOffset, segment)
+        addSegment(segment)
       }
     }
   }
@@ -349,6 +348,11 @@ class Log(@volatile var dir: File,
         loadProducersFromLog(stateManager, fetchDataInfo.records)
     }
     stateManager.updateMapEndOffset(segment.baseOffset)
+
+    // take a snapshot for the first recovered segment to avoid reloading all the segments if we shutdown before we
+    // checkpoint the recovery point
+    stateManager.takeSnapshot()
+
     val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
 
     // once we have recovered the segment's data, take a snapshot to ensure that we won't
@@ -361,8 +365,7 @@ class Log(@volatile var dir: File,
   private def completeSwapOperations(swapFiles: Set[File]): Unit = {
     for (swapFile <- swapFiles) {
       val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
-      val filename = logFile.getName
-      val startOffset = offsetFromFilename(filename)
+      val startOffset = offsetFromFile(logFile)
       val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
       val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
@@ -384,9 +387,9 @@ class Log(@volatile var dir: File,
     }
   }
 
-  // Load the log segments from the log files on disk
+  // Load the log segments from the log files on disk and return the next offset
   // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
-  private def loadSegments() {
+  private def loadSegments(): Long = {
     // first do a pass through the files in the log directory and remove any temporary files
     // and find any interrupted swap operations
     val swapFiles = removeTempFilesAndCollectSwapFiles()
@@ -399,59 +402,65 @@ class Log(@volatile var dir: File,
     // before the swap file is restored as the new segment file.
     completeSwapOperations(swapFiles)
 
-    if(logSegments.isEmpty) {
+    if (logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      segments.put(0L, new LogSegment(dir = dir,
-                                      startOffset = 0,
-                                      indexIntervalBytes = config.indexInterval,
-                                      maxIndexSize = config.maxIndexSize,
-                                      rollJitterMs = config.randomSegmentJitter,
-                                      time = time,
-                                      fileAlreadyExists = false,
-                                      initFileSize = this.initFileSize,
-                                      preallocate = config.preallocate))
+      addSegment(new LogSegment(dir = dir,
+        startOffset = 0,
+        indexIntervalBytes = config.indexInterval,
+        maxIndexSize = config.maxIndexSize,
+        rollJitterMs = config.randomSegmentJitter,
+        time = time,
+        fileAlreadyExists = false,
+        initFileSize = this.initFileSize,
+        preallocate = config.preallocate))
+      0
     } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
-      recoverLog()
+      val nextOffset = recoverLog()
       // reset the index size of the currently active log segment to allow more entries
       activeSegment.index.resize(config.maxIndexSize)
       activeSegment.timeIndex.resize(config.maxIndexSize)
-    }
+      nextOffset
+    } else 0
   }
 
   private def updateLogEndOffset(messageOffset: Long) {
     nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
   }
 
-  // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
-  private def recoverLog() {
+  /**
+   * Recover the log segments and return the next offset after recovery.
+   *
+   * This method does not need to convert IOException to KafkaStorageException because it is only called before all
+   * logs are loaded.
+   */
+  private def recoverLog(): Long = {
     // if we have the clean shutdown marker, skip recovery
-    if(hasCleanShutdownFile) {
-      this.recoveryPoint = activeSegment.nextOffset()
-      return
-    }
-
-    // okay we need to actually recovery this log
-    val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
-    while(unflushed.hasNext) {
-      val segment = unflushed.next
-      info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name))
-      val truncatedBytes =
-        try {
-          recoverSegment(segment, Some(leaderEpochCache))
-        } catch {
-          case _: InvalidOffsetException =>
-            val startOffset = segment.baseOffset
-            warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
-                 "creating an empty one with starting offset " + startOffset)
-            segment.truncateTo(startOffset)
+    if (!hasCleanShutdownFile) {
+      // okay we need to actually recovery this log
+      val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
+      while (unflushed.hasNext) {
+        val segment = unflushed.next
+        info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name))
+        val truncatedBytes =
+          try {
+            recoverSegment(segment, Some(leaderEpochCache))
+          } catch {
+            case _: InvalidOffsetException =>
+              val startOffset = segment.baseOffset
+              warn("Found invalid offset during recovery for log " + dir.getName + ". Deleting the corrupt segment and " +
+                "creating an empty one with starting offset " + startOffset)
+              segment.truncateTo(startOffset)
+          }
+        if (truncatedBytes > 0) {
+          // we had an invalid message, delete all remaining log
+          warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name,
+            segment.nextOffset()))
+          unflushed.foreach(deleteSegment)
         }
-      if(truncatedBytes > 0) {
-        // we had an invalid message, delete all remaining log
-        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name,
-          segment.nextOffset()))
-        unflushed.foreach(deleteSegment)
       }
     }
+    recoveryPoint = activeSegment.nextOffset
+    recoveryPoint
   }
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
@@ -476,7 +485,7 @@ class Log(@volatile var dir: File,
       // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
       // last two segments and the last offset. This should avoid the full scan in the case that the log needs
       // truncation.
-      val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset)
+      val nextLatestSegmentBaseOffset = lowerSegment(activeSegment.baseOffset).map(_.baseOffset)
       val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset))
       offsetsToSnapshot.flatten.foreach { offset =>
         producerStateManager.updateMapEndOffset(offset)
@@ -531,7 +540,7 @@ class Log(@volatile var dir: File,
   /**
    * Check if we have the "clean shutdown" file
    */
-  private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
+  private def hasCleanShutdownFile: Boolean = new File(dir.getParentFile, CleanShutdownFile).exists()
 
   /**
    * The number of segments in the log.
@@ -1281,15 +1290,12 @@ class Log(@volatile var dir: File,
           file.delete()
         }
 
-        segments.lastEntry() match {
-          case null =>
-          case entry => {
-            val seg = entry.getValue
-            seg.onBecomeInactiveSegment()
-            seg.index.trimToValidSize()
-            seg.timeIndex.trimToValidSize()
-            seg.log.trim()
-          }
+        Option(segments.lastEntry).foreach { entry =>
+          val seg = entry.getValue
+          seg.onBecomeInactiveSegment()
+          seg.index.trimToValidSize()
+          seg.timeIndex.trimToValidSize()
+          seg.log.trim()
         }
 
         // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
@@ -1349,11 +1355,6 @@ class Log(@volatile var dir: File,
       for (segment <- logSegments(this.recoveryPoint, offset))
         segment.flush()
 
-      // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain
-      // the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
-      // Otherwise, we would always need to rebuild from the earliest segment.
-      producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset))
-
       lock synchronized {
         if (offset > this.recoveryPoint) {
           this.recoveryPoint = offset
@@ -1363,17 +1364,42 @@ class Log(@volatile var dir: File,
     }
   }
 
-  def minSnapshotOffsetToRetain(flushedOffset: Long) = {
-    // always retain the producer snapshot from the last two segments. This solves the common case
-    // of truncating to an offset within the active segment, and the rarer case of truncating to the
-    // previous segment just after rolling the new segment.
-    var minSnapshotOffset = activeSegment.baseOffset
-    val previousSegment = segments.lowerEntry(activeSegment.baseOffset)
-    if (previousSegment != null)
-      minSnapshotOffset = previousSegment.getValue.baseOffset
-    math.min(flushedOffset, minSnapshotOffset)
+  /**
+   * Cleanup old producer snapshots after the recovery point is checkpointed. It is useful to retain
+   * the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
+   * Otherwise, we would always need to rebuild from the earliest segment.
+   *
+   * More specifically:
+   *
+   * 1. We always retain the producer snapshot from the last two segments. This solves the common case
+   * of truncating to an offset within the active segment, and the rarer case of truncating to the previous segment.
+   *
+   * 2. We only delete snapshots for offsets less than the recovery point. The recovery point is checkpointed
+   * periodically and it can be behind after a hard shutdown. Since recovery starts from the recovery point, the logic
+   * of rebuilding the producer snapshots in one pass and without loading older segments is simpler if we always
+   * have a producer snapshot for all segments being recovered.
+   *
+   * Return the minimum snapshots offset that was retained.
+   */
+  def deleteSnapshotsAfterRecoveryPointCheckpoint(): Long = {
+    val minOffsetToRetain = minSnapshotsOffsetToRetain
+    producerStateManager.deleteSnapshotsBefore(minOffsetToRetain)
+    minOffsetToRetain
+  }
+
+  // Visible for testing, see `deleteSnapshotsAfterRecoveryPointCheckpoint()` for details
+  private[log] def minSnapshotsOffsetToRetain: Long = {
+    lock synchronized {
+      val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset
+      // Prefer segment base offset
+      val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint)
+      math.min(recoveryPointOffset, twoSegmentsMinOffset)
+    }
   }
 
+  private def lowerSegment(offset: Long): Option[LogSegment] =
+    Option(segments.lowerEntry(offset)).map(_.getValue)
+
   /**
    * Completely delete this log directory and all contents from the file system with no delay
    */
@@ -1478,7 +1504,7 @@ class Log(@volatile var dir: File,
   /**
    * The time this log is last known to have been fully flushed to disk
    */
-  def lastFlushTime(): Long = lastflushedTime.get
+  def lastFlushTime: Long = lastflushedTime.get
 
   /**
    * The active segment that is currently taking appends
@@ -1497,7 +1523,7 @@ class Log(@volatile var dir: File,
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
     lock synchronized {
       val floor = segments.floorKey(from)
-      if(floor eq null)
+      if (floor eq null)
         segments.headMap(to).values.asScala
       else
         segments.subMap(floor, true, to, false).values.asScala
@@ -1638,7 +1664,7 @@ object Log {
   /** a time index file */
   val TimeIndexFileSuffix = ".timeindex"
 
-  val PidSnapshotFileSuffix = ".snapshot"
+  val ProducerSnapshotFileSuffix = ".snapshot"
 
   /** an (aborted) txn index */
   val TxnIndexFileSuffix = ".txnindex"
@@ -1704,7 +1730,7 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def logFile(dir: File, offset: Long) =
+  def logFile(dir: File, offset: Long): File =
     new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
 
   /**
@@ -1722,7 +1748,7 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def offsetIndexFile(dir: File, offset: Long) =
+  def offsetIndexFile(dir: File, offset: Long): File =
     new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
 
   /**
@@ -1731,7 +1757,7 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def timeIndexFile(dir: File, offset: Long) =
+  def timeIndexFile(dir: File, offset: Long): File =
     new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
 
   /**
@@ -1740,14 +1766,16 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The last offset (exclusive) included in the snapshot
    */
-  def producerSnapshotFile(dir: File, offset: Long) =
-    new File(dir, filenamePrefixFromOffset(offset) + PidSnapshotFileSuffix)
+  def producerSnapshotFile(dir: File, offset: Long): File =
+    new File(dir, filenamePrefixFromOffset(offset) + ProducerSnapshotFileSuffix)
 
-  def transactionIndexFile(dir: File, offset: Long) =
+  def transactionIndexFile(dir: File, offset: Long): File =
     new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix)
 
-  def offsetFromFilename(filename: String): Long =
+  def offsetFromFile(file: File): Long = {
+    val filename = file.getName
     filename.substring(0, filename.indexOf('.')).toLong
+  }
 
   /**
     * Calculate a log's size (in bytes) based on its log segments

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d45984b..217c49e 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -87,7 +87,7 @@ import scala.collection.JavaConverters._
  * @param time A way to control the passage of time
  */
 class LogCleaner(val config: CleanerConfig,
-                 val logDirs: Array[File],
+                 val logDirs: Seq[File],
                  val logs: Pool[TopicPartition, Log],
                  val logDirFailureChannel: LogDirFailureChannel,
                  time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index af8707c..e8fe093 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -47,7 +47,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState
  *  While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
  *  requested to be resumed.
  */
-private[log] class LogCleanerManager(val logDirs: Array[File],
+private[log] class LogCleanerManager(val logDirs: Seq[File],
                                      val logs: Pool[TopicPartition, Log],
                                      val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
 
@@ -59,7 +59,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File],
   private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
 
   /* the offset checkpoints holding the last cleaned point for each log */
-  @volatile private var checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap
+  @volatile private var checkpoints = logDirs.map(dir =>
+    (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap
 
   /* the set of logs currently being cleaned */
   private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
@@ -88,7 +89,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File],
           checkpoint.read()
         } catch {
           case e: KafkaStorageException =>
-            error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e)
+            error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
             Map.empty[TopicPartition, Long]
         }
       }).toMap
@@ -239,7 +240,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File],
           checkpoint.write(existing)
         } catch {
           case e: KafkaStorageException =>
-            error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e)
+            error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 102b1e5..6da494e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -47,8 +47,8 @@ import scala.collection.mutable.ArrayBuffer
  * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
-class LogManager(logDirs: Array[File],
-                 initialOfflineDirs: Array[File],
+class LogManager(logDirs: Seq[File],
+                 initialOfflineDirs: Seq[File],
                  val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
                  val defaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
@@ -63,10 +63,11 @@ class LogManager(logDirs: Array[File],
                  brokerTopicStats: BrokerTopicStats,
                  logDirFailureChannel: LogDirFailureChannel,
                  time: Time) extends Logging with KafkaMetricsGroup {
-  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
-  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
+
+  import LogManager._
+
   val LockFile = ".lock"
-  val InitialTaskDelayMs = 30*1000
+  val InitialTaskDelayMs = 30 * 1000
 
   private val logCreationOrDeletionLock = new Object
   private val logs = new Pool[TopicPartition, Log]()
@@ -74,11 +75,11 @@ class LogManager(logDirs: Array[File],
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
 
-  def liveLogDirs: Array[File] = {
+  def liveLogDirs: Seq[File] = {
     if (_liveLogDirs.size == logDirs.size)
       logDirs
     else
-      _liveLogDirs.asScala.toArray
+      _liveLogDirs.asScala.toBuffer
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
@@ -87,9 +88,14 @@ class LogManager(logDirs: Array[File],
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
     (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
 
-  private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains)
   private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
 
+  private def offlineLogDirs: Iterable[File] = {
+    val logDirsSet = mutable.Set[File](logDirs: _*)
+    _liveLogDirs.asScala.foreach(logDirsSet -=)
+    logDirsSet
+  }
+
   loadLogs()
 
 
@@ -103,7 +109,7 @@ class LogManager(logDirs: Array[File],
   val offlineLogDirectoryCount = newGauge(
     "OfflineLogDirectoryCount",
     new Gauge[Int] {
-      def value = offlineLogDirs.length
+      def value = offlineLogDirs.size
     }
   )
 
@@ -168,20 +174,22 @@ class LogManager(logDirs: Array[File],
         Exit.halt(1)
       }
 
-      recoveryPointCheckpoints = recoveryPointCheckpoints.filterKeys(file => file.getAbsolutePath != dir)
-      logStartOffsetCheckpoints = logStartOffsetCheckpoints.filterKeys(file => file.getAbsolutePath != dir)
+      recoveryPointCheckpoints = recoveryPointCheckpoints.filter { case (file, _) => file.getAbsolutePath != dir }
+      logStartOffsetCheckpoints = logStartOffsetCheckpoints.filter { case (file, _) => file.getAbsolutePath != dir }
       if (cleaner != null)
         cleaner.handleLogDirFailure(dir)
 
-      val offlineTopicPartitions = logs.filter { case (tp, log) => log.dir.getParent == dir}.map { case (tp, log) => tp }
+      val offlineTopicPartitions = logs.collect {
+        case (tp, log) if log.dir.getParent == dir => tp
+      }
 
-      offlineTopicPartitions.foreach(topicPartition => {
+      offlineTopicPartitions.foreach { topicPartition =>
         val removedLog = logs.remove(topicPartition)
         if (removedLog != null) {
           removedLog.closeHandlers()
           removedLog.removeLogMetrics()
         }
-      })
+      }
       info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
       dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy()))
     }
@@ -206,7 +214,7 @@ class LogManager(logDirs: Array[File],
     }
   }
 
-  private def loadLogs(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
+  private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
     debug("Loading log '" + logDir.getName + "'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
     val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
@@ -255,10 +263,7 @@ class LogManager(logDirs: Array[File],
         val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
 
         if (cleanShutdownFile.exists) {
-          debug(
-            "Found clean shutdown file. " +
-              "Skipping recovery for all logs in data directory: " +
-              dir.getAbsolutePath)
+          debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory: ${dir.getAbsolutePath}")
         } else {
           // log recovery itself is being performed by `Log` class during initialization
           brokerState.newState(RecoveringFromUncleanShutdown)
@@ -287,7 +292,7 @@ class LogManager(logDirs: Array[File],
         } yield {
           CoreUtils.runnable {
             try {
-              loadLogs(logDir, recoveryPoints, logStartOffsets)
+              loadLog(logDir, recoveryPoints, logStartOffsets)
             } catch {
               case e: IOException =>
                 offlineDirs.append((dir.getAbsolutePath, e))
@@ -318,10 +323,9 @@ class LogManager(logDirs: Array[File],
         logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
       }
     } catch {
-      case e: ExecutionException => {
+      case e: ExecutionException =>
         error("There was an error in one of the threads during logs loading: " + e.getCause)
         throw e.getCause
-      }
     } finally {
       threadPools.foreach(_.shutdown())
     }
@@ -334,7 +338,7 @@ class LogManager(logDirs: Array[File],
    */
   def startup() {
     /* Schedule the cleanup task to delete old logs */
-    if(scheduler != null) {
+    if (scheduler != null) {
       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
       scheduler.schedule("kafka-log-retention",
                          cleanupLogs _,
@@ -363,7 +367,7 @@ class LogManager(logDirs: Array[File],
                          period = defaultConfig.fileDeleteDelayMs,
                          TimeUnit.MILLISECONDS)
     }
-    if(cleanerConfig.enableCleaner)
+    if (cleanerConfig.enableCleaner)
       cleaner.startup()
   }
 
@@ -506,13 +510,17 @@ class LogManager(logDirs: Array[File],
    * Make a checkpoint for all logs in provided directory.
    */
   private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    val recoveryPoints = this.logsByDir.get(dir.toString)
-    if (recoveryPoints.isDefined) {
+    for {
+      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
+      checkpoint <- recoveryPointCheckpoints.get(dir)
+    } {
       try {
-        this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint)))
+        checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
+        logs.values.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point file in directory $dir", e)
+          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
+            s"file in directory $dir", e)
       }
     }
   }
@@ -521,12 +529,15 @@ class LogManager(logDirs: Array[File],
    * Checkpoint log start offset for all logs in provided directory.
    */
   private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
-    val logs = this.logsByDir.get(dir.getAbsolutePath)
-    if (logs.isDefined) {
+    for {
+      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
+      checkpoint <- logStartOffsetCheckpoints.get(dir)
+    } {
       try {
-        this.logStartOffsetCheckpoints.get(dir).foreach(_.write(
-          logs.get.filter { case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset }.mapValues(_.logStartOffset)
-        ))
+        val logStartOffsets = partitionToLog.filter { case (_, log) =>
+          log.logStartOffset > log.logSegments.head.baseOffset
+        }.mapValues(_.logStartOffset)
+        checkpoint.write(logStartOffsets)
       } catch {
         case e: IOException =>
           logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
@@ -713,7 +724,7 @@ class LogManager(logDirs: Array[File],
   /**
    * Get all the partition logs
    */
-  def allLogs(): Iterable[Log] = logs.values
+  def allLogs: Iterable[Log] = logs.values
 
   /**
    * Get a map of TopicPartition => Log
@@ -723,10 +734,8 @@ class LogManager(logDirs: Array[File],
   /**
    * Map of log dir to logs by topic and partitions in that dir
    */
-  private def logsByDir = {
-    this.logsByTopicPartition.groupBy {
-      case (_, log) => log.dir.getParent
-    }
+  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+    this.logsByTopicPartition.groupBy { case (_, log) => log.dir.getParent }
   }
 
   // logDir should be an absolute path
@@ -741,7 +750,7 @@ class LogManager(logDirs: Array[File],
   /**
    * Flush any log which has exceeded its flush interval and has unwritten messages.
    */
-  private def flushDirtyLogs() = {
+  private def flushDirtyLogs(): Unit = {
     debug("Checking for dirty logs to flush...")
 
     for ((topicPartition, log) <- logs) {
@@ -761,6 +770,8 @@ class LogManager(logDirs: Array[File],
 
 object LogManager {
 
+  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
+  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
   val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
 
   def apply(config: KafkaConfig,
@@ -788,8 +799,8 @@ object LogManager {
       backOffMs = config.logCleanerBackoffMs,
       enableCleaner = config.logCleanerEnable)
 
-    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile).toArray,
-      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile).toArray,
+    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
+      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
       topicConfigs = topicConfigs,
       defaultConfig = defaultLogConfig,
       cleanerConfig = cleanerConfig,

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 7c0a3da..63c1f56 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import java.nio.file.Files
 
 import kafka.common.KafkaException
-import kafka.log.Log.offsetFromFilename
+import kafka.log.Log.offsetFromFile
 import kafka.server.LogOffsetMetadata
 import kafka.utils.{Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.TopicPartition
@@ -417,7 +417,25 @@ object ProducerStateManager {
     }
   }
 
-  private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix)
+  private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix)
+
+  // visible for testing
+  private[log] def listSnapshotFiles(dir: File): Seq[File] = {
+    if (dir.exists && dir.isDirectory) {
+      Option(dir.listFiles).map { files =>
+        files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
+      }.getOrElse(Seq.empty)
+    } else Seq.empty
+  }
+
+  // visible for testing
+  private[log] def deleteSnapshotsBefore(dir: File, offset: Long): Unit = deleteSnapshotFiles(dir, _ < offset)
+
+  private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true) {
+    listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file =>
+      Files.deleteIfExists(file.toPath)
+    }
+  }
 
 }
 
@@ -444,7 +462,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   import ProducerStateManager._
   import java.util
 
-  private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
   private val producers = mutable.Map.empty[Long, ProducerIdEntry]
   private var lastMapOffset = 0L
   private var lastSnapOffset = 0L
@@ -509,7 +526,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
               isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry)
             }
             loadedProducers.foreach(loadProducerEntry)
-            lastSnapOffset = offsetFromFilename(file.getName)
+            lastSnapOffset = offsetFromFile(file)
             lastMapOffset = lastSnapOffset
             return
           } catch {
@@ -553,9 +570,9 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    */
   def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
     // remove all out of range snapshots
-    deleteSnapshotFiles { snapOffset =>
+    deleteSnapshotFiles(logDir, { snapOffset =>
       snapOffset > logEndOffset || snapOffset <= logStartOffset
-    }
+    })
 
     if (logEndOffset != mapEndOffset) {
       producers.clear()
@@ -625,12 +642,12 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   /**
    * Get the last offset (exclusive) of the latest snapshot file.
    */
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFilename(file.getName))
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file))
 
   /**
    * Get the last offset (exclusive) of the oldest snapshot file.
    */
-  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName))
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))
 
   private def isProducerRetained(producerIdEntry: ProducerIdEntry, logStartOffset: Long): Boolean = {
     producerIdEntry.removeBatchesOlderThan(logStartOffset)
@@ -689,7 +706,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     producers.clear()
     ongoingTxns.clear()
     unreplicatedTxns.clear()
-    deleteSnapshotFiles()
+    deleteSnapshotFiles(logDir)
     lastSnapOffset = 0L
     lastMapOffset = 0L
   }
@@ -710,25 +727,12 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   @threadsafe
-  def deleteSnapshotsBefore(offset: Long): Unit = {
-    deleteSnapshotFiles(_ < offset)
-  }
-
-  private def listSnapshotFiles: List[File] = {
-    if (logDir.exists && logDir.isDirectory) {
-      val files = logDir.listFiles
-      if (files != null)
-        files.filter(f => f.isFile && isSnapshotFile(f.getName)).toList
-      else
-        List.empty[File]
-    } else
-      List.empty[File]
-  }
+  def deleteSnapshotsBefore(offset: Long): Unit = ProducerStateManager.deleteSnapshotsBefore(logDir, offset)
 
   private def oldestSnapshotFile: Option[File] = {
     val files = listSnapshotFiles
     if (files.nonEmpty)
-      Some(files.minBy(file => offsetFromFilename(file.getName)))
+      Some(files.minBy(offsetFromFile))
     else
       None
   }
@@ -736,14 +740,11 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   private def latestSnapshotFile: Option[File] = {
     val files = listSnapshotFiles
     if (files.nonEmpty)
-      Some(files.maxBy(file => offsetFromFilename(file.getName)))
+      Some(files.maxBy(offsetFromFile))
     else
       None
   }
 
-  private def deleteSnapshotFiles(predicate: Long => Boolean = _ => true) {
-    listSnapshotFiles.filter(file => predicate(offsetFromFilename(file.getName)))
-      .foreach(file => Files.deleteIfExists(file.toPath))
-  }
+  private def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir)
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 5df6732..d6b9f1b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -301,6 +301,8 @@ abstract class AbstractFetcherThread(name: String,
 
 object AbstractFetcherThread {
 
+  case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
+
   trait FetchRequest {
     def isEmpty: Boolean
     def offset(topicPartition: TopicPartition): Long
@@ -418,5 +420,3 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncating
 
   override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog)
 }
-
-case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index aa00565..8eceaa0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2000,7 +2000,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorize(request.session, Describe, Resource.ClusterResource)) {
         val partitions =
           if (describeLogDirsDirRequest.isAllTopicPartitions)
-            replicaManager.logManager.allLogs().map(_.topicPartition).toSet
+            replicaManager.logManager.allLogs.map(_.topicPartition).toSet
           else
             describeLogDirsDirRequest.topicPartitions().asScala
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d422112..52c6de3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.util
 
+import AbstractFetcherThread.ResultWithPartitions
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequest => _, _}
 import kafka.cluster.{BrokerEndPoint, Replica}

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f4f3672..560fdd4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -611,7 +611,7 @@ class ReplicaManager(val config: KafkaConfig,
    *    are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
    */
   def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = {
-    val logsByDir = logManager.allLogs().groupBy(log => log.dir.getParent)
+    val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent)
 
     config.logDirs.toSet.map { logDir: String =>
       val absolutePath = new File(logDir).getAbsolutePath

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 9cd0963..2769cb4 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -52,9 +52,9 @@ trait OffsetCheckpoint {
 /**
   * This class persists a map of (Partition => Offsets) to a file (for a certain replica)
   */
-class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) {
-  val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion,
-    OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent)
+class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) {
+  val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
+    OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
 
   def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index a8a2ba7..f0ea50c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -114,7 +114,7 @@ object DumpLogSegments {
           dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
         case Log.TimeIndexFileSuffix =>
           dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
-        case Log.PidSnapshotFileSuffix =>
+        case Log.ProducerSnapshotFileSuffix =>
           dumpProducerIdSnapshot(file)
         case Log.TxnIndexFileSuffix =>
           dumpTxnIndex(file)
@@ -145,7 +145,7 @@ object DumpLogSegments {
   }
 
   private def dumpTxnIndex(file: File): Unit = {
-    val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file)
+    val index = new TransactionIndex(Log.offsetFromFile(file), file)
     for (abortedTxn <- index.allAbortedTxns) {
       println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
         s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 9794b1a..6544d43 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -31,9 +31,9 @@ import org.junit.{After, Before, Test}
 
 class LogManagerTest {
 
-  val time: MockTime = new MockTime()
+  val time = new MockTime()
   val maxRollInterval = 100
-  val maxLogAgeMs = 10*60*1000
+  val maxLogAgeMs = 10 * 60 * 1000
   val logProps = new Properties()
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
@@ -50,7 +50,6 @@ class LogManagerTest {
     logDir = TestUtils.tempDir()
     logManager = createLogManager()
     logManager.startup()
-    logDir = logManager.liveLogDirs(0)
   }
 
   @After
@@ -58,6 +57,7 @@ class LogManagerTest {
     if (logManager != null)
       logManager.shutdown()
     Utils.delete(logDir)
+    // Some tests assign a new LogManager
     logManager.liveLogDirs.foreach(Utils.delete)
   }
 
@@ -154,7 +154,7 @@ class LogManagerTest {
     time.sleep(log.config.fileDeleteDelayMs + 1)
 
     // there should be a log file, two indexes (the txn index is created lazily),
-    // the leader epoch checkpoint and two pid mapping files (one for the active and previous segments)
+    // the leader epoch checkpoint and two producer snapshot files (one for the active and previous segments)
     assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes)
     try {
@@ -220,7 +220,7 @@ class LogManagerTest {
   @Test
   def testLeastLoadedAssignment() {
     // create a log manager with multiple data directories
-    val dirs = Array(TestUtils.tempDir(),
+    val dirs = Seq(TestUtils.tempDir(),
                      TestUtils.tempDir(),
                      TestUtils.tempDir())
     logManager.shutdown()
@@ -253,7 +253,7 @@ class LogManagerTest {
    */
   @Test
   def testCheckpointRecoveryPoints() {
-    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager)
+    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager, logDir)
   }
 
   /**
@@ -262,11 +262,9 @@ class LogManagerTest {
   @Test
   def testRecoveryDirectoryMappingWithTrailingSlash() {
     logManager.shutdown()
-    logDir = TestUtils.tempDir()
-    logManager = TestUtils.createLogManager(
-      logDirs = Array(new File(logDir.getAbsolutePath + File.separator)))
+    logManager = TestUtils.createLogManager(logDirs = Seq(new File(TestUtils.tempDir().getAbsolutePath + File.separator)))
     logManager.startup()
-    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager)
+    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head)
   }
 
   /**
@@ -275,34 +273,30 @@ class LogManagerTest {
   @Test
   def testRecoveryDirectoryMappingWithRelativeDirectory() {
     logManager.shutdown()
-    logDir = new File("data" + File.separator + logDir.getName).getAbsoluteFile
-    logDir.mkdirs()
-    logDir.deleteOnExit()
-    logManager = createLogManager()
+    logManager = createLogManager(Seq(new File("data", logDir.getName).getAbsoluteFile))
     logManager.startup()
-    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager)
+    verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head)
   }
 
-
-  private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition],
-                                       logManager: LogManager) {
-    val logs = topicPartitions.map(this.logManager.getOrCreateLog(_, logConfig))
-    logs.foreach(log => {
+  private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager, logDir: File) {
+    val logs = topicPartitions.map(logManager.getOrCreateLog(_, logConfig))
+    logs.foreach { log =>
       for (_ <- 0 until 50)
         log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
 
       log.flush()
-    })
+    }
 
     logManager.checkpointLogRecoveryOffsets()
-    val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read()
 
     topicPartitions.zip(logs).foreach { case (tp, log) =>
       assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
+      assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
     }
   }
 
-  private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = {
+  private def createLogManager(logDirs: Seq[File] = Seq(this.logDir)): LogManager = {
     TestUtils.createLogManager(
       defaultConfig = logConfig,
       logDirs = logDirs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6d40967..1d0fd15 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -19,7 +19,6 @@ package kafka.log
 
 import java.io._
 import java.nio.ByteBuffer
-import java.nio.file.Files
 import java.util.Properties
 
 import org.apache.kafka.common.errors._
@@ -27,8 +26,8 @@ import kafka.common.KafkaException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
-import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel}
-import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
+import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
@@ -39,7 +38,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.EasyMock
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 class LogTest {
 
@@ -69,20 +68,20 @@ class LogTest {
   }
 
   @Test
-  def testOffsetFromFilename() {
+  def testOffsetFromFile() {
     val offset = 23423423L
 
     val logFile = Log.logFile(tmpDir, offset)
-    assertEquals(offset, Log.offsetFromFilename(logFile.getName))
+    assertEquals(offset, Log.offsetFromFile(logFile))
 
     val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset)
-    assertEquals(offset, Log.offsetFromFilename(offsetIndexFile.getName))
+    assertEquals(offset, Log.offsetFromFile(offsetIndexFile))
 
     val timeIndexFile = Log.timeIndexFile(tmpDir, offset)
-    assertEquals(offset, Log.offsetFromFilename(timeIndexFile.getName))
+    assertEquals(offset, Log.offsetFromFile(timeIndexFile))
 
     val snapshotFile = Log.producerSnapshotFile(tmpDir, offset)
-    assertEquals(offset, Log.offsetFromFilename(snapshotFile.getName))
+    assertEquals(offset, Log.offsetFromFile(snapshotFile))
   }
 
   /**
@@ -156,26 +155,131 @@ class LogTest {
     // simulate the upgrade path by creating a new log with several segments, deleting the
     // snapshot files, and then reloading the log
     val logConfig = createLogConfig(segmentBytes = 64 * 10)
-    val log = createLog(logDir, logConfig)
+    var log = createLog(logDir, logConfig)
     assertEquals(None, log.oldestProducerSnapshotOffset)
 
     for (i <- 0 to 100) {
       val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
       log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
     }
-
     assertTrue(log.logSegments.size >= 2)
+    val logEndOffset = log.logEndOffset
+    log.close()
+
+    val cleanShutdownFile = createCleanShutdownFile()
+    deleteProducerSnapshotFiles()
+
+    // Reload after clean shutdown
+    log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
+    var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    log.close()
+
+    Utils.delete(cleanShutdownFile)
+    deleteProducerSnapshotFiles()
+
+    // Reload after unclean shutdown with recoveryPoint set to log end offset
+    log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
+    // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case
+    expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset)
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
     log.close()
 
-    logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file =>
-      Files.delete(file.toPath)
+    deleteProducerSnapshotFiles()
+
+    // Reload after unclean shutdown with recoveryPoint set to 0
+    log = createLog(logDir, logConfig, recoveryPoint = 0L)
+    // Is this working as intended?
+    expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    log.close()
+  }
+
+  @Test
+  def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
+    val logConfig = createLogConfig(segmentBytes = 64 * 10)
+    var log = createLog(logDir, logConfig)
+    assertEquals(None, log.oldestProducerSnapshotOffset)
+
+    for (i <- 0 to 100) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
     }
 
-    val reloadedLog = createLog(logDir, logConfig)
-    val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset)
-    expectedSnapshotsOffsets.foreach { offset =>
-      assertTrue(Log.producerSnapshotFile(logDir, offset).exists)
+    assertTrue(log.logSegments.size >= 5)
+    val segmentOffsets = log.logSegments.toVector.map(_.baseOffset)
+    val activeSegmentOffset = segmentOffsets.last
+
+    // We want the recovery point to be past the segment offset and before the last 2 segments including a gap of
+    // 1 segment. We collect the data before closing the log.
+    val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3)
+    val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4)
+    val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.partition(_ < offsetForRecoveryPointSegment)
+    val recoveryPoint = offsetForRecoveryPointSegment + 1
+    assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint)
+    log.close()
+
+    val segmentsWithReads = ArrayBuffer[LogSegment]()
+    val recoveredSegments = ArrayBuffer[LogSegment]()
+
+    def createLogWithInterceptedReads(recoveryPoint: Long) = {
+      val maxProducerIdExpirationMs = 60 * 60 * 1000
+      val topicPartition = Log.parseTopicPartitionName(logDir)
+      val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
+
+      // Intercept all segment read calls
+      new Log(logDir, logConfig, logStartOffset = 0, recoveryPoint = recoveryPoint, mockTime.scheduler,
+        brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs,
+        topicPartition, producerStateManager, new LogDirFailureChannel(10)) {
+
+        override def addSegment(segment: LogSegment): LogSegment = {
+          val wrapper = new LogSegment(segment.log, segment.index, segment.timeIndex, segment.txnIndex, segment.baseOffset,
+            segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
+
+            override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
+                              minOneMessage: Boolean): FetchDataInfo = {
+              new Exception().printStackTrace()
+              segmentsWithReads += this
+              super.read(startOffset, maxOffset, maxSize, maxPosition, minOneMessage)
+            }
+
+            override def recover(producerStateManager: ProducerStateManager,
+                                 leaderEpochCache: Option[LeaderEpochCache]): Int = {
+              recoveredSegments += this
+              super.recover(producerStateManager, leaderEpochCache)
+            }
+          }
+          super.addSegment(wrapper)
+        }
+      }
     }
+
+    // Retain snapshots for the last 2 segments
+    ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2))
+    log = createLogWithInterceptedReads(offsetForRecoveryPointSegment)
+    // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour)
+    assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset) -- Seq(activeSegmentOffset))
+    assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
+    var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+ log.logEndOffset
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    log.close()
+    segmentsWithReads.clear()
+    recoveredSegments.clear()
+
+    // Only delete snapshots before the base offset of the recovery point segment (post KAFKA-5829 behaviour) to
+    // avoid reading all segments
+    ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment)
+    log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint)
+    assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
+    assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
+    expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+
+    // Verify that we keep 2 snapshot files if we checkpoint the log end offset
+    log.deleteSnapshotsAfterRecoveryPointCheckpoint()
+    expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    log.close()
   }
 
   @Test
@@ -192,7 +296,7 @@ class LogTest {
   }
 
   @Test
-  def testPidMapOffsetUpdatedForNonIdempotentData() {
+  def testProducerIdMapOffsetUpdatedForNonIdempotentData() {
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
@@ -343,7 +447,7 @@ class LogTest {
       logDirFailureChannel = null)
 
     EasyMock.verify(stateManager)
-    cleanShutdownFile.delete()
+    Utils.delete(cleanShutdownFile)
   }
 
   @Test
@@ -379,11 +483,11 @@ class LogTest {
       logDirFailureChannel = null)
 
     EasyMock.verify(stateManager)
-    cleanShutdownFile.delete()
+    Utils.delete(cleanShutdownFile)
   }
 
   @Test
-  def testRebuildPidMapWithCompactedData() {
+  def testRebuildProducerIdMapWithCompactedData() {
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid = 1L
@@ -467,7 +571,7 @@ class LogTest {
   }
 
   @Test
-  def testUpdatePidMapWithCompactedData() {
+  def testUpdateProducerIdMapWithCompactedData() {
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     val pid = 1L
@@ -500,7 +604,7 @@ class LogTest {
   }
 
   @Test
-  def testPidMapTruncateTo() {
+  def testProducerIdMapTruncateTo() {
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0)
@@ -520,7 +624,7 @@ class LogTest {
   }
 
   @Test
-  def testPidMapTruncateToWithNoSnapshots() {
+  def testProducerIdMapTruncateToWithNoSnapshots() {
     // This ensures that the upgrade optimization path cannot be hit after initial loading
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
@@ -532,10 +636,7 @@ class LogTest {
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid,
       producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
 
-    // Delete all snapshots prior to truncating
-    logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file =>
-      Files.delete(file.toPath)
-    }
+    deleteProducerSnapshotFiles()
 
     log.truncateTo(1L)
     assertEquals(1, log.activeProducersWithLastSequence.size)
@@ -612,7 +713,7 @@ class LogTest {
   }
 
   @Test
-  def testPidMapTruncateFullyAndStartAt() {
+  def testProducerIdMapTruncateFullyAndStartAt() {
     val records = TestUtils.singletonRecords("foo".getBytes)
     val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
     val log = createLog(logDir, logConfig)
@@ -634,7 +735,7 @@ class LogTest {
   }
 
   @Test
-  def testPidExpirationOnSegmentDeletion() {
+  def testProducerIdExpirationOnSegmentDeletion() {
     val pid1 = 1L
     val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0)
     val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
@@ -660,7 +761,7 @@ class LogTest {
   }
 
   @Test
-  def testTakeSnapshotOnRollAndDeleteSnapshotOnFlush() {
+  def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() {
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
@@ -677,9 +778,11 @@ class LogTest {
     log.roll(3L)
     assertEquals(Some(3L), log.latestProducerSnapshotOffset)
 
-    // roll triggers a flush at the starting offset of the new segment. we should
-    // retain the snapshots from the active segment and the previous segment, but
-    // the oldest one should be gone
+    // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots
+    assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
+
+    // retain the snapshots from the active segment and the previous segment, delete the oldest one
+    log.deleteSnapshotsAfterRecoveryPointCheckpoint()
     assertEquals(Some(2L), log.oldestProducerSnapshotOffset)
 
     // even if we flush within the active segment, the snapshot should remain
@@ -729,7 +832,7 @@ class LogTest {
   }
 
   @Test
-  def testPeriodicPidExpiration() {
+  def testPeriodicProducerIdExpiration() {
     val maxProducerIdExpirationMs = 200
     val producerIdExpirationCheckIntervalMs = 100
 
@@ -824,7 +927,7 @@ class LogTest {
   }
 
   @Test
-  def testMultiplePidsPerMemoryRecord() : Unit = {
+  def testMultipleProducerIdsPerMemoryRecord() : Unit = {
     // create a log
     val log = createLog(logDir, LogConfig())
 
@@ -1199,7 +1302,7 @@ class LogTest {
       maxOffset = Some(numMessages + 1)).records
     assertEquals("Should be no more messages", 0, lastRead.records.asScala.size)
 
-    // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
+    // check that rolling the log forced a flushed, the flush is async so retry in case of failure
     TestUtils.retry(1000L){
       assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset)
     }
@@ -1375,7 +1478,8 @@ class LogTest {
     }
     log.close()
 
-    def verifyRecoveredLog(log: Log) {
+    def verifyRecoveredLog(log: Log, expectedRecoveryPoint: Long) {
+      assertEquals(s"Unexpected recovery point", expectedRecoveryPoint, log.recoveryPoint)
       assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset)
       assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
       assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
@@ -1385,12 +1489,12 @@ class LogTest {
     }
 
     log = createLog(logDir, logConfig, recoveryPoint = lastOffset)
-    verifyRecoveredLog(log)
+    verifyRecoveredLog(log, lastOffset)
     log.close()
 
     // test recovery case
     log = createLog(logDir, logConfig)
-    verifyRecoveredLog(log)
+    verifyRecoveredLog(log, lastOffset)
     log.close()
   }
 
@@ -1827,7 +1931,7 @@ class LogTest {
     recoveryPoint = log.logEndOffset
     log = createLog(logDir, logConfig)
     assertEquals(recoveryPoint, log.logEndOffset)
-    cleanShutdownFile.delete()
+    Utils.delete(cleanShutdownFile)
   }
 
   @Test
@@ -2584,10 +2688,7 @@ class LogTest {
     appendPid4(4) // 89
     appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
 
-    // delete all snapshot files
-    logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file =>
-      Files.delete(file.toPath)
-    }
+    deleteProducerSnapshotFiles()
 
     // delete the last offset and transaction index files to force recovery. this should force us to rebuild
     // the producer state from the start of the log
@@ -2930,4 +3031,13 @@ class LogTest {
     assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
     cleanShutdownFile
   }
+
+  private def deleteProducerSnapshotFiles(): Unit = {
+    val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix))
+    files.foreach(Utils.delete)
+  }
+
+  private def listProducerSnapshotOffsets: Seq[Long] =
+    ProducerStateManager.listSnapshotFiles(logDir).map(Log.offsetFromFile).sorted
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 8650624..67b1b15 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -741,6 +741,6 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   private def currentSnapshotOffsets =
-    logDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet
+    logDir.listFiles.map(Log.offsetFromFile).toSet
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index fc49d8c..b95f66c 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import AbstractFetcherThread._
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 79acfa7..1a75f94 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -37,7 +37,7 @@ class HighwatermarkPersistenceTest {
   val zkUtils = EasyMock.createMock(classOf[ZkUtils])
   val logManagers = configs map { config =>
     TestUtils.createLogManager(
-      logDirs = config.logDirs.map(new File(_)).toArray,
+      logDirs = config.logDirs.map(new File(_)),
       cleanerConfig = CleanerConfig())
   }
 
@@ -47,7 +47,7 @@ class HighwatermarkPersistenceTest {
 
   @After
   def teardown() {
-    for(manager <- logManagers; dir <- manager.liveLogDirs)
+    for (manager <- logManagers; dir <- manager.liveLogDirs)
       Utils.delete(dir)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b8c78ff..57fe5b5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -67,7 +67,7 @@ class ReplicaManagerTest {
   def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
@@ -86,7 +86,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
@@ -104,7 +104,7 @@ class ReplicaManagerTest {
   def testIllegalRequiredAcks() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
@@ -133,7 +133,7 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
     val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
     val metadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
@@ -594,7 +594,7 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
     val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
     val metadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 66845c1..23c40fe 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -24,6 +24,7 @@ import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
 import java.io.File
 
+import kafka.log.LogManager
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
 import org.junit.{Before, Test}
@@ -67,7 +68,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     // do a clean shutdown and check that offset checkpoint file exists
     server.shutdown()
     for (logDir <- config.logDirs) {
-      val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile)
+      val OffsetCheckpointFile = new File(logDir, LogManager.RecoveryPointCheckpointFile)
       assertTrue(OffsetCheckpointFile.exists)
       assertTrue(OffsetCheckpointFile.length() > 0)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91517e8f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1149db4..135227f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -836,8 +836,8 @@ object TestUtils extends Logging {
         return
       } catch {
         case e: AssertionError =>
-          val ellapsed = System.currentTimeMillis - startTime
-          if(ellapsed > maxWaitMs) {
+          val elapsed = System.currentTimeMillis - startTime
+          if (elapsed > maxWaitMs) {
             throw e
           } else {
             info("Attempt failed, sleeping for " + wait + ", and then retrying.")
@@ -1019,7 +1019,7 @@ object TestUtils extends Logging {
   /**
    * Create new LogManager instance with default configuration for testing
    */
-  def createLogManager(logDirs: Array[File] = Array.empty[File],
+  def createLogManager(logDirs: Seq[File] = Seq.empty[File],
                        defaultConfig: LogConfig = LogConfig(),
                        cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
                        time: MockTime = new MockTime()): LogManager = {


Mime
View raw message