kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-6324; Change LogSegment.delete to deleteIfExists and harden log recovery
Date Tue, 12 Dec 2017 08:00:44 GMT
KAFKA-6324; Change LogSegment.delete to deleteIfExists and harden log recovery

- Rename `delete()` to `deleteIfExists()` in `LogSegment`, `AbstractIndex`
and `TxnIndex`. Throw exception in case of IO errors for more informative
errors and to make it less likely that errors are ignored, `boolean` is used
for the case where the file does not exist (like `Files.deleteIfExists()`).
- Fix an instance of delete while open (should fix KAFKA-6322 and
KAFKA-6075).
- `LogSegment.deleteIfExists` no longer throws an exception if any of
the files it tries to delete does not exist (fixes KAFKA-6194).
- Remove unnecessary `FileChannel.force(true)` when deleting file.
- Introduce `LogSegment.open()` and use it to improve encapsulation
and reduce duplication.
- Expand functionality of `LogSegment.onBecomeInactiveSegment()`
to reduce duplication and improve encapsulation.
- Use `AbstractIndex.deleteIfExists()` instead of deleting files manually.
- Improve logging when deleting swap files.
- Use CorruptIndexException instead of IllegalArgumentException.
- Simplify `LogCleaner.cleanSegments()` to reduce duplication and
improve encapsulation.
- A few other clean-ups in Log, LogSegment, etc.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Ted Yu <yuzhihong@gmail.com>

Closes #4040 from ijuma/kafka-5829-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5cd34d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5cd34d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5cd34d7

Branch: refs/heads/trunk
Commit: a5cd34d7962ff5da9b99d0229ef5a9a5fcb3f318
Parents: 72eec0a
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Dec 12 10:00:05 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Dec 12 10:00:05 2017 +0200

----------------------------------------------------------------------
 .../apache/kafka/common/record/FileRecords.java |   9 +-
 .../main/scala/kafka/log/AbstractIndex.scala    |  24 ++-
 .../scala/kafka/log/CorruptIndexException.scala |  20 ++
 core/src/main/scala/kafka/log/Log.scala         | 204 ++++++++-----------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  58 +++---
 core/src/main/scala/kafka/log/LogSegment.scala  | 164 +++++++++------
 core/src/main/scala/kafka/log/OffsetIndex.scala |  12 +-
 core/src/main/scala/kafka/log/TimeIndex.scala   |  19 +-
 .../main/scala/kafka/log/TransactionIndex.scala |  33 ++-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  24 +++
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  39 ++--
 .../scala/unit/kafka/log/LogManagerTest.scala   |   6 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  69 +++----
 .../src/test/scala/unit/kafka/log/LogTest.scala |  56 ++---
 .../scala/unit/kafka/log/TimeIndexTest.scala    |  41 ++++
 .../unit/kafka/log/TransactionIndexTest.scala   |   4 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |   1 -
 .../scala/unit/kafka/utils/CoreUtilsTest.scala  |  51 +++++
 .../scala/unit/kafka/utils/MockScheduler.scala  |   8 +-
 19 files changed, 513 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 36591a0..fd9b4be 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -30,6 +30,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
+import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -178,11 +179,13 @@ public class FileRecords extends AbstractRecords implements Closeable {
 
     /**
      * Delete this message set from the filesystem
-     * @return True iff this message set was deleted.
+     * @throws IOException if deletion fails due to an I/O error
+     * @return  {@code true} if the file was deleted by this method; {@code false} if the file could not be deleted
+     *          because it did not exist
      */
-    public boolean delete() {
+    public boolean deleteIfExists() throws IOException {
         Utils.closeQuietly(channel, "FileChannel");
-        return file.delete();
+        return Files.deleteIfExists(file.toPath());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 9696d8d..44083c1 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -20,6 +20,7 @@ package kafka.log
 import java.io.{File, RandomAccessFile}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.nio.channels.FileChannel
+import java.nio.file.Files
 import java.util.concurrent.locks.{Lock, ReentrantLock}
 
 import kafka.log.IndexSearchType.IndexSearchEntity
@@ -156,10 +157,13 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
   }
 
   /**
-   * Delete this index file
+   * Delete this index file.
+   *
+   * @throws IOException if deletion fails due to an I/O error
+   * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
+   *         not exist
    */
-  def delete(): Boolean = {
-    info(s"Deleting index ${file.getAbsolutePath}")
+  def deleteIfExists(): Boolean = {
     inLock(lock) {
       // On JVM, a memory mapping is typically unmapped by garbage collector.
       // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
@@ -167,7 +171,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
       // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
       safeForceUnmap()
     }
-    file.delete()
+    Files.deleteIfExists(file.toPath)
   }
 
   /**
@@ -199,14 +203,14 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
   /**
    * Do a basic sanity check on this index to detect obvious problems
    *
-   * @throws IllegalArgumentException if any problems are found
+   * @throws CorruptIndexException if any problems are found
    */
   def sanityCheck(): Unit
 
   /**
    * Remove all the entries from the index.
    */
-  def truncate(): Unit
+  protected def truncate(): Unit
 
   /**
    * Remove all entries from the index which have an offset greater than or equal to the given offset.
@@ -214,6 +218,14 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
    */
   def truncateTo(offset: Long): Unit
 
+  /**
+   * Remove all the entries from the index and resize the index to the max index size.
+   */
+  def reset(): Unit = {
+    truncate()
+    resize(maxIndexSize)
+  }
+
   protected def safeForceUnmap(): Unit = {
     try forceUnmap()
     catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/CorruptIndexException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CorruptIndexException.scala b/core/src/main/scala/kafka/log/CorruptIndexException.scala
new file mode 100644
index 0000000..b39ee5b
--- /dev/null
+++ b/core/src/main/scala/kafka/log/CorruptIndexException.scala
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+class CorruptIndexException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c7ec3bd..a46fffe 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -18,7 +18,7 @@
 package kafka.log
 
 import java.io.{File, IOException}
-import java.nio.file.Files
+import java.nio.file.{Files, NoSuchFileException}
 import java.text.NumberFormat
 import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
@@ -266,28 +266,33 @@ class Log(@volatile var dir: File,
   }
 
   private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
+
+    def deleteIndicesIfExist(baseFile: File, swapFile: File, fileType: String): Unit = {
+      info(s"Found $fileType file ${swapFile.getAbsolutePath} from interrupted swap operation. Deleting index files (if they exist).")
+      val offset = offsetFromFile(baseFile)
+      Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath)
+      Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath)
+      Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath)
+    }
+
     var swapFiles = Set[File]()
 
     for (file <- dir.listFiles if file.isFile) {
       if (!file.canRead)
-        throw new IOException("Could not read file " + file)
+        throw new IOException(s"Could not read file $file")
       val filename = file.getName
       if (filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
-        // if the file ends in .deleted or .cleaned, delete it
+        debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
         Files.deleteIfExists(file.toPath)
-      } else if(filename.endsWith(SwapFileSuffix)) {
+      } else if (filename.endsWith(SwapFileSuffix)) {
         // we crashed in the middle of a swap operation, to recover:
-        // if a log, delete the .index file, complete the swap operation later
-        // if an index just delete it, it will be rebuilt
+        // if a log, delete the index files, complete the swap operation later
+        // if an index just delete the index files, they will be rebuilt
         val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
         if (isIndexFile(baseFile)) {
-          Files.deleteIfExists(file.toPath)
+          deleteIndicesIfExist(baseFile, file, "index")
         } else if (isLogFile(baseFile)) {
-          // delete the index files
-          val offset = offsetFromFile(baseFile)
-          Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath)
-          Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath)
-          Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath)
+          deleteIndicesIfExist(baseFile, file, "log")
           swapFiles += file
         }
       }
@@ -305,46 +310,29 @@ class Log(@volatile var dir: File,
         val offset = offsetFromFile(file)
         val logFile = Log.logFile(dir, offset)
         if (!logFile.exists) {
-          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
+          warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
           Files.deleteIfExists(file.toPath)
         }
       } else if (isLogFile(file)) {
         // if it's a log file, load the corresponding log segment
-        val startOffset = offsetFromFile(file)
-        val indexFile = Log.offsetIndexFile(dir, startOffset)
-        val timeIndexFile = Log.timeIndexFile(dir, startOffset)
-        val txnIndexFile = Log.transactionIndexFile(dir, startOffset)
-
-        val indexFileExists = indexFile.exists()
-        val timeIndexFileExists = timeIndexFile.exists()
-        val segment = new LogSegment(dir = dir,
-          startOffset = startOffset,
-          indexIntervalBytes = config.indexInterval,
-          maxIndexSize = config.maxIndexSize,
-          rollJitterMs = config.randomSegmentJitter,
+        val baseOffset = offsetFromFile(file)
+        val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
+        val segment = LogSegment.open(dir = dir,
+          baseOffset = baseOffset,
+          config,
           time = time,
           fileAlreadyExists = true)
 
-        if (indexFileExists) {
-          try {
-            segment.index.sanityCheck()
-            // Resize the time index file to 0 if it is newly created.
-            if (!timeIndexFileExists)
-              segment.timeIndex.resize(0)
-            segment.timeIndex.sanityCheck()
-            segment.txnIndex.sanityCheck()
-          } catch {
-            case e: java.lang.IllegalArgumentException =>
-              warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
-                s"${indexFile.getAbsolutePath}, and ${txnIndexFile.getAbsolutePath} and rebuilding index...")
-              Files.deleteIfExists(timeIndexFile.toPath)
-              Files.delete(indexFile.toPath)
-              segment.txnIndex.delete()
-              recoverSegment(segment)
-          }
-        } else {
-          error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
-          recoverSegment(segment)
+        try segment.sanityCheck(timeIndexFileNewlyCreated)
+        catch {
+          case _: NoSuchFileException =>
+            error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
+              "recovering segment and rebuilding index files...")
+            recoverSegment(segment)
+          case e: CorruptIndexException =>
+            warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
+              s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
+            recoverSegment(segment)
         }
         addSegment(segment)
       }
@@ -378,24 +366,15 @@ class Log(@volatile var dir: File,
   private def completeSwapOperations(swapFiles: Set[File]): Unit = {
     for (swapFile <- swapFiles) {
       val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
-      val startOffset = offsetFromFile(logFile)
-      val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
-      val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
-      val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
-      val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
-      val txnIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TxnIndexFileSuffix) + SwapFileSuffix)
-      val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
-      val swapSegment = new LogSegment(FileRecords.open(swapFile),
-        index = index,
-        timeIndex = timeIndex,
-        txnIndex = txnIndex,
-        baseOffset = startOffset,
-        indexIntervalBytes = config.indexInterval,
-        rollJitterMs = config.randomSegmentJitter,
-        time = time)
-      info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
+      val baseOffset = offsetFromFile(logFile)
+      val swapSegment = LogSegment.open(swapFile.getParentFile,
+        baseOffset = baseOffset,
+        config,
+        time = time,
+        fileSuffix = SwapFileSuffix)
+      info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
       recoverSegment(swapSegment)
-      val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset())
+      val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset)
       replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
     }
   }
@@ -417,11 +396,9 @@ class Log(@volatile var dir: File,
 
     if (logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      addSegment(new LogSegment(dir = dir,
-        startOffset = 0,
-        indexIntervalBytes = config.indexInterval,
-        maxIndexSize = config.maxIndexSize,
-        rollJitterMs = config.randomSegmentJitter,
+      addSegment(LogSegment.open(dir = dir,
+        baseOffset = 0,
+        config,
         time = time,
         fileAlreadyExists = false,
         initFileSize = this.initFileSize,
@@ -430,8 +407,7 @@ class Log(@volatile var dir: File,
     } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
       val nextOffset = recoverLog()
       // reset the index size of the currently active log segment to allow more entries
-      activeSegment.index.resize(config.maxIndexSize)
-      activeSegment.timeIndex.resize(config.maxIndexSize)
+      activeSegment.resizeIndexes(config.maxIndexSize)
       nextOffset
     } else 0
   }
@@ -467,12 +443,12 @@ class Log(@volatile var dir: File,
         if (truncatedBytes > 0) {
           // we had an invalid message, delete all remaining log
           warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name,
-            segment.nextOffset()))
+            segment.readNextOffset))
           unflushed.foreach(deleteSegment)
         }
       }
     }
-    recoveryPoint = activeSegment.nextOffset
+    recoveryPoint = activeSegment.readNextOffset
     recoveryPoint
   }
 
@@ -1293,12 +1269,9 @@ class Log(@volatile var dir: File,
   private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
     val segment = activeSegment
     val now = time.milliseconds
-    val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
-    if (segment.size > config.segmentSize - messagesSize ||
-        (segment.size > 0 && reachedRollMs) ||
-        segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
+    if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
       debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
-          s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
+          s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
           s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
           s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
       /*
@@ -1325,7 +1298,7 @@ class Log(@volatile var dir: File,
    */
   def roll(expectedNextOffset: Long = 0): LogSegment = {
     maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
-      val start = time.nanoseconds
+      val start = time.hiResClockMs()
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
         val newOffset = math.max(expectedNextOffset, logEndOffset)
@@ -1334,17 +1307,11 @@ class Log(@volatile var dir: File,
         val timeIdxFile = timeIndexFile(dir, newOffset)
         val txnIdxFile = transactionIndexFile(dir, newOffset)
         for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
-          warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
-          file.delete()
+          warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
+          Files.delete(file.toPath)
         }
 
-        Option(segments.lastEntry).foreach { entry =>
-          val seg = entry.getValue
-          seg.onBecomeInactiveSegment()
-          seg.index.trimToValidSize()
-          seg.timeIndex.trimToValidSize()
-          seg.log.trim()
-        }
+        Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
 
         // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
         // offset align with the new segment offset since this ensures we can recover the segment by beginning
@@ -1354,11 +1321,9 @@ class Log(@volatile var dir: File,
         producerStateManager.updateMapEndOffset(newOffset)
         producerStateManager.takeSnapshot()
 
-        val segment = new LogSegment(dir,
-          startOffset = newOffset,
-          indexIntervalBytes = config.indexInterval,
-          maxIndexSize = config.maxIndexSize,
-          rollJitterMs = config.randomSegmentJitter,
+        val segment = LogSegment.open(dir,
+          baseOffset = newOffset,
+          config,
           time = time,
           fileAlreadyExists = false,
           initFileSize = initFileSize,
@@ -1372,7 +1337,7 @@ class Log(@volatile var dir: File,
         // schedule an asynchronous flush of the old segment
         scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
 
-        info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0 * 1000.0)))
+        info(s"Rolled new log segment for '$name' in ${time.hiResClockMs() - start} ms.")
 
         segment
       }
@@ -1457,7 +1422,7 @@ class Log(@volatile var dir: File,
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
         removeLogMetrics()
-        logSegments.foreach(_.delete())
+        logSegments.foreach(_.deleteIfExists())
         segments.clear()
         _leaderEpochCache.clear()
         Utils.delete(dir)
@@ -1535,11 +1500,9 @@ class Log(@volatile var dir: File,
         checkIfMemoryMappedBufferClosed()
         val segmentsToDelete = logSegments.toList
         segmentsToDelete.foreach(deleteSegment)
-        addSegment(new LogSegment(dir,
-          newOffset,
-          indexIntervalBytes = config.indexInterval,
-          maxIndexSize = config.maxIndexSize,
-          rollJitterMs = config.randomSegmentJitter,
+        addSegment(LogSegment.open(dir,
+          baseOffset = newOffset,
+          config = config,
           time = time,
           fileAlreadyExists = false,
           initFileSize = initFileSize,
@@ -1578,11 +1541,10 @@ class Log(@volatile var dir: File,
    */
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
     lock synchronized {
-      val floor = segments.floorKey(from)
-      if (floor eq null)
-        segments.headMap(to).values.asScala
-      else
-        segments.subMap(floor, true, to, false).values.asScala
+      val view = Option(segments.floorKey(from)).map { floor =>
+        segments.subMap(floor, to)
+      }.getOrElse(segments.headMap(to))
+      view.values.asScala
     }
   }
 
@@ -1622,9 +1584,9 @@ class Log(@volatile var dir: File,
   private def asyncDeleteSegment(segment: LogSegment) {
     segment.changeFileSuffixes("", Log.DeletedFileSuffix)
     def deleteSeg() {
-      info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
+      info(s"Deleting segment ${segment.baseOffset} from log $name.")
       maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
-        segment.delete()
+        segment.deleteIfExists()
       }
     }
     scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
@@ -1786,13 +1748,14 @@ object Log {
   }
 
   /**
-   * Construct a log file name in the given dir with the given base offset
+   * Construct a log file name in the given dir with the given base offset and the given suffix
    *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
+   * @param suffix The suffix to be appended to the file name (e.g. "", ".deleted", ".cleaned", ".swap", etc.)
    */
-  def logFile(dir: File, offset: Long): File =
-    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
+  def logFile(dir: File, offset: Long, suffix: String = ""): File =
+    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix)
 
   /**
     * Return a directory name to rename the log directory to for async deletion. The name will be in the following
@@ -1824,22 +1787,24 @@ object Log {
   }
 
   /**
-   * Construct an index file name in the given dir using the given base offset
+   * Construct an index file name in the given dir using the given base offset and the given suffix
    *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
+   * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.)
    */
-  def offsetIndexFile(dir: File, offset: Long): File =
-    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
+  def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File =
+    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix + suffix)
 
   /**
-   * Construct a time index file name in the given dir using the given base offset
+   * Construct a time index file name in the given dir using the given base offset and the given suffix
    *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
+   * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.)
    */
-  def timeIndexFile(dir: File, offset: Long): File =
-    new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
+  def timeIndexFile(dir: File, offset: Long, suffix: String = ""): File =
+    new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix + suffix)
 
   /**
    * Construct a producer id snapshot file using the given offset.
@@ -1850,8 +1815,15 @@ object Log {
   def producerSnapshotFile(dir: File, offset: Long): File =
     new File(dir, filenamePrefixFromOffset(offset) + ProducerSnapshotFileSuffix)
 
-  def transactionIndexFile(dir: File, offset: Long): File =
-    new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix)
+  /**
+   * Construct a transaction index file name in the given dir using the given base offset and the given suffix
+   *
+   * @param dir The directory in which the log will reside
+   * @param offset The base offset of the log file
+   * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.)
+   */
+  def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File =
+    new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix + suffix)
 
   def offsetFromFile(file: File): Long = {
     val filename = file.getName

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 27229f3..c5c0d49 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -19,6 +19,7 @@ package kafka.log
 
 import java.io.{File, IOException}
 import java.nio._
+import java.nio.file.Files
 import java.util.Date
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
@@ -422,26 +423,20 @@ private[log] class Cleaner(val id: Int,
                                  deleteHorizonMs: Long,
                                  stats: CleanerStats) {
 
-    def deleteAndGetCleanedFile(file: File): File = {
-      val f = new File(file.getPath + Log.CleanedFileSuffix)
-      f.delete()
-      f
+    def deleteCleanedFileIfExists(file: File): Unit = {
+      Files.deleteIfExists(new File(file.getPath + Log.CleanedFileSuffix).toPath)
     }
 
     // create a new segment with a suffix appended to the name of the log and indexes
     val firstSegment = segments.head
-    val logFile = deleteAndGetCleanedFile(firstSegment.log.file)
-    val indexFile = deleteAndGetCleanedFile(firstSegment.index.file)
-    val timeIndexFile = deleteAndGetCleanedFile(firstSegment.timeIndex.file)
-    val txnIndexFile = deleteAndGetCleanedFile(firstSegment.txnIndex.file)
-
-    val startOffset = firstSegment.baseOffset
-    val records = FileRecords.open(logFile, false, log.initFileSize, log.config.preallocate)
-    val index = new OffsetIndex(indexFile, startOffset, firstSegment.index.maxIndexSize)
-    val timeIndex = new TimeIndex(timeIndexFile, startOffset, firstSegment.timeIndex.maxIndexSize)
-    val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
-    val cleaned = new LogSegment(records, index, timeIndex, txnIndex, startOffset, firstSegment.indexIntervalBytes,
-      log.config.randomSegmentJitter, time)
+    deleteCleanedFileIfExists(firstSegment.log.file)
+    deleteCleanedFileIfExists(firstSegment.offsetIndex.file)
+    deleteCleanedFileIfExists(firstSegment.timeIndex.file)
+    deleteCleanedFileIfExists(firstSegment.txnIndex.file)
+
+    val baseOffset = firstSegment.baseOffset
+    val cleaned = LogSegment.open(log.dir, baseOffset, log.config, time, fileSuffix = Log.CleanedFileSuffix,
+      initFileSize = log.initFileSize, preallocate = log.config.preallocate)
 
     try {
       // clean segments into the new destination segment
@@ -454,7 +449,7 @@ private[log] class Cleaner(val id: Int,
         val startOffset = currentSegment.baseOffset
         val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
-        val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(txnIndex))
+        val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex))
 
         val retainDeletes = currentSegment.lastModified > deleteHorizonMs
         info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " +
@@ -465,18 +460,7 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
-      // trim log segment
-      cleaned.log.trim()
-
-      // trim excess index
-      index.trimToValidSize()
-
-      // Append the last index entry
       cleaned.onBecomeInactiveSegment()
-
-      // trim time index
-      timeIndex.trimToValidSize()
-
       // flush new segment to disk before swap
       cleaned.flush()
 
@@ -485,12 +469,16 @@ private[log] class Cleaner(val id: Int,
       cleaned.lastModified = modified
 
       // swap in new segment
-      info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+      info(s"Swapping in cleaned segment ${cleaned.baseOffset} for segment(s) ${segments.map(_.baseOffset).mkString(",")} " +
+        s"in log ${log.name}")
       log.replaceSegments(cleaned, segments)
     } catch {
       case e: LogCleaningAbortedException =>
-        cleaned.delete()
-        throw e
+        try cleaned.deleteIfExists()
+        catch {
+          case deleteException: Exception =>
+            e.addSuppressed(deleteException)
+        } finally throw e
     }
   }
 
@@ -657,17 +645,17 @@ private[log] class Cleaner(val id: Int,
     while(segs.nonEmpty) {
       var group = List(segs.head)
       var logSize = segs.head.size.toLong
-      var indexSize = segs.head.index.sizeInBytes.toLong
+      var indexSize = segs.head.offsetIndex.sizeInBytes.toLong
       var timeIndexSize = segs.head.timeIndex.sizeInBytes.toLong
       segs = segs.tail
       while(segs.nonEmpty &&
             logSize + segs.head.size <= maxSize &&
-            indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
+            indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
             timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
             lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
         group = segs.head :: group
         logSize += segs.head.size
-        indexSize += segs.head.index.sizeInBytes
+        indexSize += segs.head.offsetIndex.sizeInBytes
         timeIndexSize += segs.head.timeIndex.sizeInBytes
         segs = segs.tail
       }
@@ -748,7 +736,7 @@ private[log] class Cleaner(val id: Int,
                                        maxLogMessageSize: Int,
                                        transactionMetadata: CleanedTransactionMetadata,
                                        stats: CleanerStats): Boolean = {
-    var position = segment.index.lookup(startOffset).position
+    var position = segment.offsetIndex.lookup(startOffset).position
     val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
       checkDone(topicPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 6db2a50..45c820b 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -17,9 +17,10 @@
 package kafka.log
 
 import java.io.{File, IOException}
-import java.nio.file.Files
+import java.nio.file.{Files, NoSuchFileException}
 import java.nio.file.attribute.FileTime
 import java.util.concurrent.TimeUnit
+
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server.epoch.LeaderEpochCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
@@ -27,7 +28,7 @@ import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time}
 
 import scala.collection.JavaConverters._
 import scala.math._
@@ -41,21 +42,47 @@ import scala.math._
  * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
  *
  * @param log The message set containing log entries
- * @param index The offset index
+ * @param offsetIndex The offset index
  * @param timeIndex The timestamp index
  * @param baseOffset A lower bound on the offsets in this segment
  * @param indexIntervalBytes The approximate number of bytes between entries in the index
  * @param time The time instance
  */
 @nonthreadsafe
-class LogSegment(val log: FileRecords,
-                 val index: OffsetIndex,
-                 val timeIndex: TimeIndex,
-                 val txnIndex: TransactionIndex,
-                 val baseOffset: Long,
-                 val indexIntervalBytes: Int,
-                 val rollJitterMs: Long,
-                 time: Time) extends Logging {
+class LogSegment private[log] (val log: FileRecords,
+                               val offsetIndex: OffsetIndex,
+                               val timeIndex: TimeIndex,
+                               val txnIndex: TransactionIndex,
+                               val baseOffset: Long,
+                               val indexIntervalBytes: Int,
+                               val rollJitterMs: Long,
+                               val maxSegmentMs: Long,
+                               val maxSegmentBytes: Int,
+                               val time: Time) extends Logging {
+
+  def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
+    val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
+    size > maxSegmentBytes - messagesSize ||
+      (size > 0 && reachedRollMs) ||
+      offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
+  }
+
+  def resizeIndexes(size: Int): Unit = {
+    offsetIndex.resize(size)
+    timeIndex.resize(size)
+  }
+
+  def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = {
+    if (offsetIndex.file.exists) {
+      offsetIndex.sanityCheck()
+      // Resize the time index file to 0 if it is newly created.
+      if (timeIndexFileNewlyCreated)
+        timeIndex.resize(0)
+      timeIndex.sanityCheck()
+      txnIndex.sanityCheck()
+    }
+    else throw new NoSuchFileException(s"Offset index file ${offsetIndex.file.getAbsolutePath} does not exist")
+  }
 
   private var created = time.milliseconds
 
@@ -69,17 +96,6 @@ class LogSegment(val log: FileRecords,
   @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp
   @volatile private var offsetOfMaxTimestamp: Long = timeIndex.lastEntry.offset
 
-  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time,
-           fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
-    this(FileRecords.open(Log.logFile(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
-         new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
-         new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
-         new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)),
-         startOffset,
-         indexIntervalBytes,
-         rollJitterMs,
-         time)
-
   /* Return the size in bytes of this log segment */
   def size: Int = log.sizeInBytes()
 
@@ -126,7 +142,7 @@ class LogSegment(val log: FileRecords,
       }
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        index.append(firstOffset, physicalPosition)
+        offsetIndex.append(firstOffset, physicalPosition)
         timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
         bytesSinceLastIndexEntry = 0
       }
@@ -170,7 +186,7 @@ class LogSegment(val log: FileRecords,
    */
   @threadsafe
   private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
-    val mapping = index.lookup(offset)
+    val mapping = offsetIndex.lookup(offset)
     log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
   }
 
@@ -237,7 +253,7 @@ class LogSegment(val log: FileRecords,
   }
 
    def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] =
-     index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
+     offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
 
   /**
    * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
@@ -250,11 +266,9 @@ class LogSegment(val log: FileRecords,
    */
   @nonthreadsafe
   def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
-    index.truncate()
-    index.resize(index.maxIndexSize)
-    timeIndex.truncate()
-    timeIndex.resize(timeIndex.maxIndexSize)
-    txnIndex.truncate()
+    offsetIndex.reset()
+    timeIndex.reset()
+    txnIndex.reset()
     var validBytes = 0
     var lastIndexEntry = 0
     maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
@@ -269,9 +283,9 @@ class LogSegment(val log: FileRecords,
         }
 
         // Build offset index
-        if(validBytes - lastIndexEntry > indexIntervalBytes) {
+        if (validBytes - lastIndexEntry > indexIntervalBytes) {
           val startOffset = batch.baseOffset
-          index.append(startOffset, validBytes)
+          offsetIndex.append(startOffset, validBytes)
           timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
           lastIndexEntry = validBytes
         }
@@ -295,7 +309,7 @@ class LogSegment(val log: FileRecords,
       debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
 
     log.truncateTo(validBytes)
-    index.trimToValidSize()
+    offsetIndex.trimToValidSize()
     // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
     timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
     timeIndex.trimToValidSize()
@@ -308,7 +322,7 @@ class LogSegment(val log: FileRecords,
     maxTimestampSoFar = lastTimeIndexEntry.timestamp
     offsetOfMaxTimestamp = lastTimeIndexEntry.offset
 
-    val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
+    val offsetPosition = offsetIndex.lookup(lastTimeIndexEntry.offset)
     // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
     val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
     if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
@@ -334,11 +348,11 @@ class LogSegment(val log: FileRecords,
     val mapping = translateOffset(offset)
     if (mapping == null)
       return 0
-    index.truncateTo(offset)
+    offsetIndex.truncateTo(offset)
     timeIndex.truncateTo(offset)
     txnIndex.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  active) index
-    index.resize(index.maxIndexSize)
+    offsetIndex.resize(offsetIndex.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
     val bytesTruncated = log.truncateTo(mapping.position)
     if(log.sizeInBytes == 0) {
@@ -356,8 +370,8 @@ class LogSegment(val log: FileRecords,
    * Note that this is expensive.
    */
   @threadsafe
-  def nextOffset(): Long = {
-    val ms = read(index.lastOffset, None, log.sizeInBytes)
+  def readNextOffset: Long = {
+    val ms = read(offsetIndex.lastOffset, None, log.sizeInBytes)
     if (ms == null)
       baseOffset
     else
@@ -373,7 +387,7 @@ class LogSegment(val log: FileRecords,
   def flush() {
     LogFlushStats.logFlushTimer.time {
       log.flush()
-      index.flush()
+      offsetIndex.flush()
       timeIndex.flush()
       txnIndex.flush()
     }
@@ -385,7 +399,7 @@ class LogSegment(val log: FileRecords,
    */
   def updateDir(dir: File): Unit = {
     log.setFile(new File(dir, log.file.getName))
-    index.file = new File(dir, index.file.getName)
+    offsetIndex.file = new File(dir, offsetIndex.file.getName)
     timeIndex.file = new File(dir, timeIndex.file.getName)
     txnIndex.file = new File(dir, txnIndex.file.getName)
   }
@@ -396,17 +410,21 @@ class LogSegment(val log: FileRecords,
    */
   def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
     log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
-    index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
+    offsetIndex.renameTo(new File(CoreUtils.replaceSuffix(offsetIndex.file.getPath, oldSuffix, newSuffix)))
     timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
     txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
   }
 
   /**
-   * Append the largest time index entry to the time index when this log segment become inactive segment.
-   * This entry will be used to decide when to delete the segment.
+   * Append the largest time index entry to the time index and trim the log and indexes.
+   *
+   * The time index entry appended will be used to decide when to delete the segment.
    */
   def onBecomeInactiveSegment() {
     timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+    offsetIndex.trimToValidSize()
+    timeIndex.trimToValidSize()
+    log.trim()
   }
 
   /**
@@ -455,7 +473,7 @@ class LogSegment(val log: FileRecords,
   def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
     // Get the index entry with a timestamp less than or equal to the target timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
-    val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
+    val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position
 
     // Search the timestamp
     Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
@@ -468,7 +486,7 @@ class LogSegment(val log: FileRecords,
    */
   def close() {
     CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true), this)
-    CoreUtils.swallow(index.close(), this)
+    CoreUtils.swallow(offsetIndex.close(), this)
     CoreUtils.swallow(timeIndex.close(), this)
     CoreUtils.swallow(log.close(), this)
     CoreUtils.swallow(txnIndex.close(), this)
@@ -478,7 +496,7 @@ class LogSegment(val log: FileRecords,
     * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed
     */
   def closeHandlers() {
-    CoreUtils.swallow(index.closeHandler(), this)
+    CoreUtils.swallow(offsetIndex.closeHandler(), this)
     CoreUtils.swallow(timeIndex.closeHandler(), this)
     CoreUtils.swallow(log.closeHandlers(), this)
     CoreUtils.swallow(txnIndex.close(), this)
@@ -487,19 +505,25 @@ class LogSegment(val log: FileRecords,
   /**
    * Delete this log segment from the filesystem.
    */
-  def delete() {
-    val deletedLog = log.delete()
-    val deletedIndex = index.delete()
-    val deletedTimeIndex = timeIndex.delete()
-    val deletedTxnIndex = txnIndex.delete()
-    if (!deletedLog && log.file.exists)
-      throw new IOException("Delete of log " + log.file.getName + " failed.")
-    if (!deletedIndex && index.file.exists)
-      throw new IOException("Delete of index " + index.file.getName + " failed.")
-    if (!deletedTimeIndex && timeIndex.file.exists)
-      throw new IOException("Delete of time index " + timeIndex.file.getName + " failed.")
-    if (!deletedTxnIndex && txnIndex.file.exists)
-      throw new IOException("Delete of transaction index " + txnIndex.file.getName + " failed.")
+  def deleteIfExists() {
+    def delete(delete: () => Boolean, fileType: String, file: File, logIfMissing: Boolean): Unit = {
+      try {
+        if (delete())
+          info(s"Deleted $fileType ${file.getAbsolutePath}.")
+        else if (logIfMissing)
+          info(s"Failed to delete $fileType ${file.getAbsolutePath} because it does not exist.")
+      }
+      catch {
+        case e: IOException => throw new IOException(s"Delete of $fileType ${file.getAbsolutePath} failed.", e)
+      }
+    }
+
+    CoreUtils.tryAll(Seq(
+      () => delete(log.deleteIfExists _, "log", log.file, logIfMissing = true),
+      () => delete(offsetIndex.deleteIfExists _, "offset index", offsetIndex.file, logIfMissing = true),
+      () => delete(timeIndex.deleteIfExists _, "time index", timeIndex.file, logIfMissing = true),
+      () => delete(txnIndex.deleteIfExists _, "transaction index", txnIndex.file, logIfMissing = false)
+    ))
   }
 
   /**
@@ -518,11 +542,31 @@ class LogSegment(val log: FileRecords,
   def lastModified_=(ms: Long) = {
     val fileTime = FileTime.fromMillis(ms)
     Files.setLastModifiedTime(log.file.toPath, fileTime)
-    Files.setLastModifiedTime(index.file.toPath, fileTime)
+    Files.setLastModifiedTime(offsetIndex.file.toPath, fileTime)
     Files.setLastModifiedTime(timeIndex.file.toPath, fileTime)
   }
 }
 
+object LogSegment {
+
+  def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
+           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
+    val maxIndexSize = config.maxIndexSize
+    new LogSegment(
+      FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
+      new OffsetIndex(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
+      new TimeIndex(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
+      new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)),
+      baseOffset,
+      indexIntervalBytes = config.indexInterval,
+      rollJitterMs = config.randomSegmentJitter,
+      maxSegmentMs = config.segmentMs,
+      maxSegmentBytes = config.segmentSize,
+      time)
+  }
+
+}
+
 object LogFlushStats extends KafkaMetricsGroup {
   val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index eb15842..523c88c 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -187,12 +187,12 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
   }
 
   override def sanityCheck() {
-    require(_entries == 0 || _lastOffset > baseOffset,
-            s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
-                s"is ${_lastOffset} which is no larger than the base offset $baseOffset.")
-    require(length % entrySize == 0,
-            "Index file " + file.getAbsolutePath + " is corrupt, found " + length +
-            " bytes which is not positive or not a multiple of 8.")
+    if (_entries != 0 && _lastOffset <= baseOffset)
+      throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " +
+        s"but the last offset is ${_lastOffset} which is no greater than the base offset $baseOffset.")
+    if (length % entrySize != 0)
+      throw new CorruptIndexException(s"Index file ${file.getAbsolutePath} is corrupt, found $length bytes which is " +
+        s"neither positive nor a multiple of $entrySize.")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 47ab2e5..e505f36 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -208,14 +208,15 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
   override def sanityCheck() {
     val lastTimestamp = lastEntry.timestamp
     val lastOffset = lastEntry.offset
-    require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
-      s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
-          s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")
-    require(_entries == 0 || lastOffset >= baseOffset,
-      s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
-          s"is $lastOffset which is smaller than the first offset $baseOffset")
-    require(length % entrySize == 0,
-      "Time index file " + file.getAbsolutePath + " is corrupt, found " + length +
-          " bytes which is not positive or not a multiple of 12.")
+    if (_entries != 0 && lastTimestamp < timestamp(mmap, 0))
+      throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
+        s"non-zero size but the last timestamp is $lastTimestamp which is less than the first timestamp " +
+        s"${timestamp(mmap, 0)}")
+    if (_entries != 0 && lastOffset < baseOffset)
+      throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
+        s"non-zero size but the last offset is $lastOffset which is less than the first offset $baseOffset")
+    if (length % entrySize != 0)
+      throw new CorruptIndexException(s"Time index file ${file.getAbsolutePath} is corrupt, found $length bytes " +
+        s"which is neither positive nor a multiple of $entrySize.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/log/TransactionIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index 9afe009..349f0ce 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -19,7 +19,7 @@ package kafka.log
 import java.io.{File, IOException}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
-import java.nio.file.StandardOpenOption
+import java.nio.file.{Files, StandardOpenOption}
 
 import kafka.utils.{Logging, nonthreadsafe}
 import org.apache.kafka.common.KafkaException
@@ -61,12 +61,16 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
 
   def flush(): Unit = maybeChannel.foreach(_.force(true))
 
-  def delete(): Boolean = {
-    maybeChannel.forall { channel =>
-      channel.force(true)
-      close()
-      file.delete()
-    }
+  /**
+   * Delete this index.
+   *
+   * @throws IOException if deletion fails due to an I/O error
+   * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
+   *         not exist
+   */
+  def deleteIfExists(): Boolean = {
+    close()
+    Files.deleteIfExists(file.toPath)
   }
 
   private def channel: FileChannel = {
@@ -84,7 +88,10 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     channel
   }
 
-  def truncate() = {
+  /**
+   * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
+   */
+  def reset(): Unit = {
     maybeChannel.foreach(_.truncate(0))
     lastOffset = None
   }
@@ -171,11 +178,17 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
   }
 
+  /**
+   * Do a basic sanity check on this index to detect obvious problems.
+   *
+   * @throws CorruptIndexException if any problems are found.
+   */
   def sanityCheck(): Unit = {
     val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
     for ((abortedTxn, _) <- iterator(() => buffer)) {
-      require(abortedTxn.lastOffset >= startOffset, s"Last offset of aborted transaction $abortedTxn is less than " +
-        s"start offset $startOffset")
+      if (abortedTxn.lastOffset < startOffset)
+        throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn is less than start offset " +
+          s"$startOffset")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index efd4d1e..938828c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -101,6 +101,30 @@ object CoreUtils extends Logging {
   def delete(files: Seq[String]): Unit = files.foreach(f => Utils.delete(new File(f)))
 
   /**
+   * Invokes every function in `all` even if one or more functions throws an exception.
+   *
+   * If any of the functions throws an exception, the first one will be rethrown at the end with subsequent exceptions
+   * added as suppressed exceptions.
+   */
+  // Note that this is a generalised version of `Utils.closeAll`. We could potentially make it more general by
+  // changing the signature to `def tryAll[R](all: Seq[() => R]): Seq[R]`
+  def tryAll(all: Seq[() => Unit]): Unit = {
+    var exception: Throwable = null
+    all.foreach { element =>
+      try element.apply()
+      catch {
+        case e: Throwable =>
+          if (exception != null)
+            exception.addSuppressed(e)
+          else
+            exception = e
+      }
+    }
+    if (exception != null)
+      throw exception
+  }
+
+  /**
    * Register the given mbean with the platform mbean server,
    * unregistering any mbean that was there before. Note,
    * this method will not throw an exception if the registration

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 517e876..c12f617 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -826,7 +826,7 @@ class LogCleanerTest extends JUnitSuite {
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
 
     // check grouping by index size
-    val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
+    val indexSize = log.logSegments.take(groupSize).map(_.offsetIndex.sizeInBytes).sum + 1
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize, log.logEndOffset)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
@@ -856,7 +856,7 @@ class LogCleanerTest extends JUnitSuite {
     val records = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
     log.appendAsFollower(records)
     log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
-    assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
+    assertEquals(Int.MaxValue, log.activeSegment.offsetIndex.lastOffset)
 
     // grouping should result in a single group with maximum relative offset of Int.MaxValue
     var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
@@ -877,7 +877,7 @@ class LogCleanerTest extends JUnitSuite {
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(log.numberOfSegments - 1, groups.size)
     for (group <- groups)
-      assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
+      assertTrue("Relative offset greater than Int.MaxValue", group.last.offsetIndex.lastOffset - group.head.offsetIndex.baseOffset <= Int.MaxValue)
     checkSegmentOrder(groups)
   }
 
@@ -912,14 +912,14 @@ class LogCleanerTest extends JUnitSuite {
     log.appendAsFollower(record4)
 
     assertTrue("Actual offset range should be > Int.MaxValue", log.logEndOffset - 1 - log.logStartOffset > Int.MaxValue)
-    assertTrue("index.lastOffset is reporting the wrong last offset", log.logSegments.last.index.lastOffset - log.logStartOffset <= Int.MaxValue)
+    assertTrue("index.lastOffset is reporting the wrong last offset", log.logSegments.last.offsetIndex.lastOffset - log.logStartOffset <= Int.MaxValue)
 
     // grouping should result in two groups because the second segment takes the offset range > MaxInt
     val groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(2, groups.size)
 
     for (group <- groups)
-      assertTrue("Relative offset greater than Int.MaxValue", group.last.nextOffset() - 1 - group.head.baseOffset <= Int.MaxValue)
+      assertTrue("Relative offset greater than Int.MaxValue", group.last.readNextOffset - 1 - group.head.baseOffset <= Int.MaxValue)
     checkSegmentOrder(groups)
   }
 
@@ -978,7 +978,7 @@ class LogCleanerTest extends JUnitSuite {
 
     val config = LogConfig.fromProps(logConfig.originals, logProps)
 
-    def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {
+    def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Int]): Log = {
       // Recover log file and check that after recovery, keys are as expected
       // and all temporary files have been deleted
       val recoveredLog = makeLog(config = config)
@@ -995,7 +995,7 @@ class LogCleanerTest extends JUnitSuite {
     // create a log and append some messages
     var log = makeLog(config = config)
     var messageCount = 0
-    while(log.numberOfSegments < 10) {
+    while (log.numberOfSegments < 10) {
       log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
@@ -1008,7 +1008,10 @@ class LogCleanerTest extends JUnitSuite {
 
     // clean the log
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    // clear scheduler so that async deletes don't run
+    time.scheduler.clear()
     var cleanedKeys = keysInLog(log)
+    log.close()
 
     // 1) Simulate recovery just after .cleaned file is created, before rename to .swap
     //    On recovery, clean operation is aborted. All messages should be present in the log
@@ -1020,7 +1023,10 @@ class LogCleanerTest extends JUnitSuite {
 
     // clean again
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    // clear scheduler so that async deletes don't run
+    time.scheduler.clear()
     cleanedKeys = keysInLog(log)
+    log.close()
 
     // 2) Simulate recovery just after swap file is created, before old segment files are
     //    renamed to .deleted. Clean operation is resumed during recovery.
@@ -1031,13 +1037,15 @@ class LogCleanerTest extends JUnitSuite {
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
-    while(log.numberOfSegments < 10) {
+    while (log.numberOfSegments < 10) {
       log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    // clear scheduler so that async deletes don't run
+    time.scheduler.clear()
     cleanedKeys = keysInLog(log)
 
     // 3) Simulate recovery after swap file is created and old segments files are renamed
@@ -1046,18 +1054,22 @@ class LogCleanerTest extends JUnitSuite {
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
-    while(log.numberOfSegments < 10) {
+    while (log.numberOfSegments < 10) {
       log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
       messageCount += 1
     }
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
+    // clear scheduler so that async deletes don't run
+    time.scheduler.clear()
     cleanedKeys = keysInLog(log)
+    log.close()
 
     // 4) Simulate recovery after swap is complete, but async deletion
     //    is not yet complete. Clean operation is resumed during recovery.
-    recoverAndCheck(config, cleanedKeys)
+    log = recoverAndCheck(config, cleanedKeys)
+    log.close()
   }
 
   @Test
@@ -1242,9 +1254,7 @@ class LogCleanerTest extends JUnitSuite {
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
 
-  private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
-
-  private def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) =
+  private def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = _ => (), maxMessageSize: Int = 64*1024) =
     new Cleaner(id = 0,
                 offsetMap = new FakeOffsetMap(capacity),
                 ioBufferSize = maxMessageSize,
@@ -1255,8 +1265,7 @@ class LogCleanerTest extends JUnitSuite {
                 checkDone = checkDone)
 
   private def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
-    for((key, value) <- seq)
-      yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
+    for ((key, value) <- seq) yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
   }
 
   private def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 4c1f4ae..8a04914 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -308,18 +308,18 @@ class LogManagerTest {
     val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
     val activeSegment = log.activeSegment
     val logName = activeSegment.log.file.getName
-    val indexName = activeSegment.index.file.getName
+    val indexName = activeSegment.offsetIndex.file.getName
     val timeIndexName = activeSegment.timeIndex.file.getName
     val txnIndexName = activeSegment.txnIndex.file.getName
     val indexFilesOnDiskBeforeDelete = activeSegment.log.file.getParentFile.listFiles.filter(_.getName.endsWith("index"))
 
     val removedLog = logManager.asyncDelete(new TopicPartition(name, 0))
     val removedSegment = removedLog.activeSegment
-    val indexFilesAfterDelete = Seq(removedSegment.index.file, removedSegment.timeIndex.file,
+    val indexFilesAfterDelete = Seq(removedSegment.offsetIndex.file, removedSegment.timeIndex.file,
       removedSegment.txnIndex.file)
 
     assertEquals(new File(removedLog.dir, logName), removedSegment.log.file)
-    assertEquals(new File(removedLog.dir, indexName), removedSegment.index.file)
+    assertEquals(new File(removedLog.dir, indexName), removedSegment.offsetIndex.file)
     assertEquals(new File(removedLog.dir, timeIndexName), removedSegment.timeIndex.file)
     assertEquals(new File(removedLog.dir, txnIndexName), removedSegment.txnIndex.file)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index cef2bca..469b3cc 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.log
+package kafka.log
 
 import java.io.File
 
@@ -37,18 +37,12 @@ class LogSegmentTest {
 
   /* create a segment with the given base offset */
   def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
-    val msFile = TestUtils.tempFile()
-    val ms = FileRecords.open(msFile)
-    val idxFile = TestUtils.tempFile()
-    val timeIdxFile = TestUtils.tempFile()
-    val txnIdxFile = TestUtils.tempFile()
-    idxFile.delete()
-    timeIdxFile.delete()
-    txnIdxFile.delete()
-    val idx = new OffsetIndex(idxFile, offset, 1000)
-    val timeIdx = new TimeIndex(timeIdxFile, offset, 1500)
-    val txnIndex = new TransactionIndex(offset, txnIdxFile)
-    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, Time.SYSTEM)
+    val ms = FileRecords.open(Log.logFile(logDir, offset))
+    val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
+    val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
+    val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
+    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue,
+      maxSegmentBytes = Int.MaxValue, Time.SYSTEM)
     segments += seg
     seg
   }
@@ -66,12 +60,7 @@ class LogSegmentTest {
 
   @After
   def teardown() {
-    for(seg <- segments) {
-      seg.index.delete()
-      seg.timeIndex.delete()
-      seg.txnIndex.delete()
-      seg.log.delete()
-    }
+    segments.foreach(_.close())
     Utils.delete(logDir)
   }
 
@@ -177,7 +166,7 @@ class LogSegmentTest {
       seg.append(offset, offset, offset, offset, records(offset, "hello"))
       offset += 1
     }
-    assertEquals(offset, seg.nextOffset)
+    assertEquals(offset, seg.readNextOffset)
 
     val expectedNumEntries = numMessages / 2 - 1
     assertEquals(s"Should have $expectedNumEntries time indexes", expectedNumEntries, seg.timeIndex.entries)
@@ -185,7 +174,7 @@ class LogSegmentTest {
     seg.truncateTo(41)
     assertEquals(s"Should have 0 time indexes", 0, seg.timeIndex.entries)
     assertEquals(s"Largest timestamp should be 400", 400L, seg.largestTimestamp)
-    assertEquals(41, seg.nextOffset)
+    assertEquals(41, seg.readNextOffset)
   }
 
   /**
@@ -233,9 +222,9 @@ class LogSegmentTest {
   @Test
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
-    assertEquals(40, seg.nextOffset)
+    assertEquals(40, seg.readNextOffset)
     seg.append(50, 52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
-    assertEquals(53, seg.nextOffset)
+    assertEquals(53, seg.readNextOffset)
   }
 
   /**
@@ -245,12 +234,12 @@ class LogSegmentTest {
   def testChangeFileSuffixes() {
     val seg = createSegment(40)
     val logFile = seg.log.file
-    val indexFile = seg.index.file
+    val indexFile = seg.offsetIndex.file
     seg.changeFileSuffixes("", ".deleted")
     assertEquals(logFile.getAbsolutePath + ".deleted", seg.log.file.getAbsolutePath)
-    assertEquals(indexFile.getAbsolutePath + ".deleted", seg.index.file.getAbsolutePath)
+    assertEquals(indexFile.getAbsolutePath + ".deleted", seg.offsetIndex.file.getAbsolutePath)
     assertTrue(seg.log.file.exists)
-    assertTrue(seg.index.file.exists)
+    assertTrue(seg.offsetIndex.file.exists)
   }
 
   /**
@@ -262,7 +251,7 @@ class LogSegmentTest {
     val seg = createSegment(0)
     for(i <- 0 until 100)
       seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
-    val indexFile = seg.index.file
+    val indexFile = seg.offsetIndex.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(new ProducerStateManager(topicPartition, logDir))
     for(i <- 0 until 100)
@@ -369,7 +358,7 @@ class LogSegmentTest {
     val messagesAppended = 20
     for (_ <- 0 until 10) {
       val seg = createSegment(0)
-      for(i <- 0 until messagesAppended)
+      for (i <- 0 until messagesAppended)
         seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
@@ -380,14 +369,18 @@ class LogSegmentTest {
       seg.recover(new ProducerStateManager(topicPartition, logDir))
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList,
         seg.log.batches.asScala.map(_.lastOffset).toList)
-      seg.delete()
+      seg.deleteIfExists()
     }
   }
 
-  /* create a segment with   pre allocate */
-  def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
+  private def createSegment(baseOffset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
     val tempDir = TestUtils.tempDir()
-    val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists,
+    val logConfig = LogConfig(Map(
+      LogConfig.IndexIntervalBytesProp -> 10,
+      LogConfig.SegmentIndexBytesProp -> 1000,
+      LogConfig.SegmentJitterMsProp -> 0
+    ).asJava)
+    val seg = LogSegment.open(tempDir, baseOffset, logConfig, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists,
       initFileSize = initFileSize, preallocate = preallocate)
     segments += seg
     seg
@@ -409,7 +402,14 @@ class LogSegmentTest {
   @Test
   def testCreateWithInitFileSizeClearShutdown() {
     val tempDir = TestUtils.tempDir()
-    val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true)
+    val logConfig = LogConfig(Map(
+      LogConfig.IndexIntervalBytesProp -> 10,
+      LogConfig.SegmentIndexBytesProp -> 1000,
+      LogConfig.SegmentJitterMsProp -> 0
+    ).asJava)
+
+    val seg = LogSegment.open(tempDir, baseOffset = 40, logConfig, Time.SYSTEM, fileAlreadyExists = false,
+      initFileSize = 512 * 1024 * 1024, preallocate = true)
 
     val ms = records(50, "hello", "there")
     seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
@@ -425,7 +425,8 @@ class LogSegmentTest {
     //After close, file should be trimmed
     assertEquals(oldSize, seg.log.file.length)
 
-    val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, true,  512*1024*1024, true)
+    val segReopen = LogSegment.open(tempDir, baseOffset = 40, logConfig, Time.SYSTEM, fileAlreadyExists = true,
+      initFileSize = 512 * 1024 * 1024, preallocate = true)
     segments += segReopen
 
     val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1e408ec..2f78ec3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -19,6 +19,7 @@ package kafka.log
 
 import java.io._
 import java.nio.ByteBuffer
+import java.nio.file.Files
 import java.util.Properties
 
 import org.apache.kafka.common.errors._
@@ -234,8 +235,8 @@ class LogTest {
         topicPartition, producerStateManager, new LogDirFailureChannel(10)) {
 
         override def addSegment(segment: LogSegment): LogSegment = {
-          val wrapper = new LogSegment(segment.log, segment.index, segment.timeIndex, segment.txnIndex, segment.baseOffset,
-            segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
+          val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset,
+            segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
 
             override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
                               minOneMessage: Boolean): FetchDataInfo = {
@@ -1474,8 +1475,8 @@ class LogTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
-    val lastIndexOffset = log.activeSegment.index.lastOffset
-    val numIndexEntries = log.activeSegment.index.entries
+    val lastIndexOffset = log.activeSegment.offsetIndex.lastOffset
+    val numIndexEntries = log.activeSegment.offsetIndex.entries
     val lastOffset = log.logEndOffset
     // After segment is closed, the last entry in the time index should be (largest timestamp -> last offset).
     val lastTimeIndexOffset = log.logEndOffset - 1
@@ -1489,8 +1490,8 @@ class LogTest {
     def verifyRecoveredLog(log: Log, expectedRecoveryPoint: Long) {
       assertEquals(s"Unexpected recovery point", expectedRecoveryPoint, log.recoveryPoint)
       assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset)
-      assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
-      assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+      assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.offsetIndex.lastOffset)
+      assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.offsetIndex.entries)
       assertEquals("Should have same last time index timestamp", lastTimeIndexTimestamp, log.activeSegment.timeIndex.lastEntry.timestamp)
       assertEquals("Should have same last time index offset", lastTimeIndexOffset, log.activeSegment.timeIndex.lastEntry.offset)
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
@@ -1536,7 +1537,7 @@ class LogTest {
     var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
-    val indexFiles = log.logSegments.map(_.index.file)
+    val indexFiles = log.logSegments.map(_.offsetIndex.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
@@ -1547,7 +1548,7 @@ class LogTest {
     // reopen the log
     log = createLog(logDir, logConfig)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
-    assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
+    assertTrue("The index should have been rebuilt", log.logSegments.head.offsetIndex.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -1568,21 +1569,20 @@ class LogTest {
     val segmentSize = 200
     val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0")
     var log = createLog(logDir, logConfig)
-    for(i <- 0 until numMessages)
+    for (i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
         timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
     // Delete the time index.
-    timeIndexFiles.foreach(_.delete())
+    timeIndexFiles.foreach(file => Files.delete(file.toPath))
 
     // The rebuilt time index should be empty
     log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1)
-    val segArray = log.logSegments.toArray
-    for (i <- segArray.indices.init) {
-      assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
-      assertEquals("The time index file size should be 0", 0, segArray(i).timeIndex.file.length)
+    for (segment <- log.logSegments.init) {
+      assertEquals("The time index should be empty", 0, segment.timeIndex.entries)
+      assertEquals("The time index file size should be 0", 0, segment.timeIndex.file.length)
     }
   }
 
@@ -1597,7 +1597,7 @@ class LogTest {
     var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
-    val indexFiles = log.logSegments.map(_.index.file)
+    val indexFiles = log.logSegments.map(_.offsetIndex.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
@@ -1705,12 +1705,12 @@ class LogTest {
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
     val expectedEntries = msgPerSeg - 1
 
-    assertEquals(s"The index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.index.maxEntries)
+    assertEquals(s"The index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.offsetIndex.maxEntries)
     assertEquals(s"The time index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.timeIndex.maxEntries)
 
     log.truncateTo(0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
-    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries)
+    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.offsetIndex.maxEntries)
     assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries)
 
     mockTime.sleep(msgPerSeg)
@@ -1782,20 +1782,20 @@ class LogTest {
 
     // files should be renamed
     val segments = log.logSegments.toArray
-    val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
+    val oldFiles = segments.map(_.log.file) ++ segments.map(_.offsetIndex.file)
 
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
     assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
-                                                                 segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix)))
+                                                                 segments.forall(_.offsetIndex.file.getName.endsWith(Log.DeletedFileSuffix)))
     assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) &&
-                                                            segments.forall(_.index.file.exists))
+                                                            segments.forall(_.offsetIndex.file.exists))
     assertTrue("The original file should be gone.", oldFiles.forall(!_.exists))
 
     // when enough time passes the files should be deleted
-    val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
+    val deletedFiles = segments.map(_.log.file) ++ segments.map(_.offsetIndex.file)
     mockTime.sleep(asyncDeleteMs + 1)
     assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
   }
@@ -1863,7 +1863,7 @@ class LogTest {
       log.close()
 
       // corrupt index and log by appending random bytes
-      TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
+      TestUtils.appendNonsenseToFile(log.activeSegment.offsetIndex.file, TestUtils.random.nextInt(1024) + 1)
       TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
 
       // attempt recovery
@@ -2614,8 +2614,8 @@ class LogTest {
 
     // delete all the offset and transaction index files to force recovery
     log.logSegments.foreach { segment =>
-      segment.index.delete()
-      segment.txnIndex.delete()
+      segment.offsetIndex.deleteIfExists()
+      segment.txnIndex.deleteIfExists()
     }
 
     log.close()
@@ -2666,8 +2666,8 @@ class LogTest {
     // delete the last offset and transaction index files to force recovery
     val lastSegment = log.logSegments.last
     val recoveryPoint = lastSegment.baseOffset
-    lastSegment.index.delete()
-    lastSegment.txnIndex.delete()
+    lastSegment.offsetIndex.deleteIfExists()
+    lastSegment.txnIndex.deleteIfExists()
 
     log.close()
 
@@ -2720,8 +2720,8 @@ class LogTest {
     // the producer state from the start of the log
     val lastSegment = log.logSegments.last
     val recoveryPoint = lastSegment.baseOffset
-    lastSegment.index.delete()
-    lastSegment.txnIndex.delete()
+    lastSegment.offsetIndex.deleteIfExists()
+    lastSegment.txnIndex.deleteIfExists()
 
     log.close()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
index c6112a1..8520f89 100644
--- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -93,5 +93,46 @@ class TimeIndexTest extends JUnitSuite {
     file.delete()
     file
   }
+
+  @Test
+  def testSanityCheck(): Unit = {
+    idx.sanityCheck()
+    appendEntries(5)
+    val firstEntry = idx.entry(0)
+    idx.sanityCheck()
+    idx.close()
+
+    var shouldCorruptOffset = false
+    var shouldCorruptTimestamp = false
+    var shouldCorruptLength = false
+    idx = new TimeIndex(idx.file, baseOffset = baseOffset, maxIndexSize = maxEntries * 12) {
+      override def lastEntry = {
+        val superLastEntry = super.lastEntry
+        val offset = if (shouldCorruptOffset) baseOffset - 1 else superLastEntry.offset
+        val timestamp = if (shouldCorruptTimestamp) firstEntry.timestamp - 1 else superLastEntry.timestamp
+        new TimestampOffset(timestamp, offset)
+      }
+      override def length = {
+        val superLength = super.length
+        if (shouldCorruptLength) superLength - 1 else superLength
+      }
+    }
+
+    shouldCorruptOffset = true
+    intercept[CorruptIndexException](idx.sanityCheck())
+    shouldCorruptOffset = false
+
+    shouldCorruptTimestamp = true
+    intercept[CorruptIndexException](idx.sanityCheck())
+    shouldCorruptTimestamp = false
+
+    shouldCorruptLength = true
+    intercept[CorruptIndexException](idx.sanityCheck())
+    shouldCorruptLength = false
+
+    idx.sanityCheck()
+    idx.close()
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 9b90e91..574a8f5 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -56,7 +56,7 @@ class TransactionIndexTest extends JUnitSuite {
     assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[CorruptIndexException])
   def testSanityCheck(): Unit = {
     val abortedTxns = List(
       new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
@@ -134,7 +134,7 @@ class TransactionIndexTest extends JUnitSuite {
     index.truncateTo(50)
     assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions)
 
-    index.truncate()
+    index.reset()
     assertEquals(List.empty[AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
   }
 


Mime
View raw message