kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-615 fsync asynchronous from log roll. Patch reviewed by Jun and Sriram.
Date Tue, 06 Aug 2013 04:41:00 GMT
Updated Branches:
  refs/heads/trunk 493a46613 -> df18fe13a


KAFKA-615 fsync asynchronous from log roll. Patch reviewed by Jun and Sriram.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df18fe13
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df18fe13
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df18fe13

Branch: refs/heads/trunk
Commit: df18fe13ad50bba3dbc7d1db0ecd8b698169ade2
Parents: 493a466
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Wed Jul 17 14:45:03 2013 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Mon Aug 5 21:31:52 2013 -0700

----------------------------------------------------------------------
 config/server.properties                        |  18 +--
 .../main/scala/kafka/cluster/Partition.scala    |   1 +
 .../main/scala/kafka/log/FileMessageSet.scala   |   4 +-
 core/src/main/scala/kafka/log/Log.scala         | 137 ++++++++++---------
 core/src/main/scala/kafka/log/LogManager.scala  |  38 +++--
 core/src/main/scala/kafka/log/LogSegment.scala  |  11 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala |   1 -
 .../main/scala/kafka/server/KafkaConfig.scala   |  14 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   1 +
 .../test/scala/other/kafka/StressTestLog.scala  |   2 +-
 .../other/kafka/TestLinearWriteSpeed.scala      | 116 ++++++++++++----
 .../scala/other/kafka/TestLogPerformance.scala  |  58 --------
 .../test/scala/unit/kafka/log/CleanerTest.scala |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  39 +++++-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  13 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  88 ++++++++----
 .../server/HighwatermarkPersistenceTest.scala   |   1 +
 .../unit/kafka/server/ServerShutdownTest.scala  |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   9 ++
 20 files changed, 331 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 01e0b12..7685879 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -56,23 +56,15 @@ num.partitions=2
 
 ############################# Log Flush Policy #############################
 
-# The following configurations control the flush of data to disk. This is the most
-# important performance knob in kafka.
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
-#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
-#    3. Throughput: The flush is generally the most expensive operation. 
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The below configuration can enforce a more aggressive application level
+# fsync policy. This will have a significant performance impact.
 
 # The number of messages to accept before forcing a flush of data to disk
-log.flush.interval.messages=10000
+#log.flush.interval.messages=10000
 
 # The maximum amount of time a message can sit in a log before we force a flush
-log.flush.interval.ms=1000
-
-# Per-topic overrides for log.flush.interval.ms
-#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+#log.flush.interval.ms=1000
 
 ############################# Log Retention Policy #############################
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4b803bb..3323b60 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -197,6 +197,7 @@ class Partition(val topic: String,
           // stop fetcher thread to previous leader
           replicaFetcherManager.removeFetcher(topic, partitionId)
           localReplica.log.get.truncateTo(localReplica.highWatermark)
+          logManager.checkpointRecoveryPointOffsets()
           inSyncReplicas = Set.empty[Replica]
           leaderEpoch = leaderAndIsr.leaderEpoch
           zkVersion = leaderAndIsr.zkVersion

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 1afb533..2479a5f 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -52,11 +52,9 @@ class FileMessageSet private[kafka](@volatile var file: File,
       new AtomicInteger(math.min(channel.size().toInt, end) - start)
 
   /* if this is not a slice, update the file pointer to the end of the file */
-  if (!isSlice) {
-    debug("Creating or reloading log segment %s".format(file.getAbsolutePath))
+  if (!isSlice)
     /* set the file position to the last byte in the file */
     channel.position(channel.size)
-  }
 
   /**
    * Create a file message set with no slicing.

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 87151b9..626eb8f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -38,19 +38,16 @@ import com.yammer.metrics.core.Gauge
  * for a given segment.
  * 
  * @param dir The directory in which log segments are created.
- * @param maxSegmentSize The maximum segment size in bytes.
- * @param maxMessageSize The maximum message size in bytes (including headers) that will be allowed in this log.
- * @param flushInterval The number of messages that can be appended to this log before we force a flush of the log.
- * @param rollIntervalMs The time after which we will force the rolling of a new log segment
- * @param needsRecovery Should we run recovery on this log when opening it? This should be done if the log wasn't cleanly shut down.
- * @param maxIndexSize The maximum size of an offset index in this log. The index of the active log segment will be pre-allocated to this size.
- * @param indexIntervalBytes The (approximate) number of bytes between entries in the offset index for this log.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock 
  * 
  */
 @threadsafe
 class Log(val dir: File,
           @volatile var config: LogConfig,
-          val needsRecovery: Boolean,
+          @volatile var recoveryPoint: Long = 0L,
           val scheduler: Scheduler,
           time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
 
@@ -59,14 +56,12 @@ class Log(val dir: File,
   /* A lock that guards all modifications to the log */
   private val lock = new Object
 
-  /* The current number of unflushed messages appended to the write */
-  private val unflushed = new AtomicInteger(0)
-
   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(time.milliseconds)
 
   /* the actual segments of the log */
-  private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
+  private val segments: ConcurrentNavigableMap[Long,LogSegment] = new ConcurrentSkipListMap[Long, LogSegment]
+  loadSegments()
   
   /* The number of times the log has been truncated */
   private val truncates = new AtomicInteger(0)
@@ -86,10 +81,7 @@ class Log(val dir: File,
   def name  = dir.getName()
 
   /* Load the log segments from the log files on disk */
-  private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = {
-    // open all the segments read-only
-    val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
-
+  private def loadSegments() {
     // create the log directory if it doesn't exist
     dir.mkdirs()
     
@@ -145,46 +137,57 @@ class Log(val dir: File,
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
         }
-        logSegments.put(start, segment)
+        segments.put(start, segment)
       }
     }
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      logSegments.put(0,
-                      new LogSegment(dir = dir, 
+      segments.put(0, new LogSegment(dir = dir, 
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize))
     } else {
+      recoverLog()
       // reset the index size of the currently active log segment to allow more entries
-      val active = logSegments.lastEntry.getValue
-      active.index.resize(config.maxIndexSize)
+      activeSegment.index.resize(config.maxIndexSize)
+    }
 
-      // run recovery on the active segment if necessary
-      if(needsRecovery) {
+    // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset.
+    for (s <- logSegments) {
+      require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+              "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+              .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+    }
+  }
+  
+  private def recoverLog() {
+    val lastOffset = try {activeSegment.nextOffset} catch {case _ => -1L}
+    if(lastOffset <= this.recoveryPoint) {
+      info("Log '%s' is fully intact, skipping recovery".format(name))
+      this.recoveryPoint = lastOffset
+      return
+    }
+    val unflushed = logSegments(segments.floorKey(this.recoveryPoint), Long.MaxValue).iterator
+    while(unflushed.hasNext) {
+      val curr = unflushed.next
+      info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
+      val truncatedBytes = 
         try {
-          info("Recovering active segment of %s.".format(name))
-          active.recover(config.maxMessageSize)
+          curr.recover(config.maxMessageSize)
         } catch {
-          case e: InvalidOffsetException =>
-            val startOffset = active.baseOffset
-            warn("Found invalid offset during recovery of the active segment for topic partition " + dir.getName +". Deleting the segment and " +
+          case e: InvalidOffsetException => 
+            val startOffset = curr.baseOffset
+            warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                  "creating an empty one with starting offset " + startOffset)
-            // truncate the active segment to its starting offset
-            active.truncateTo(startOffset)
+            curr.truncateTo(startOffset)
         }
+      if(truncatedBytes > 0) {
+        // we had an invalid message, delete all remaining log
+        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
+        unflushed.foreach(deleteSegment)
       }
     }
-
-    // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset.
-    for (s <- asIterable(logSegments.values)) {
-      require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
-              "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-              .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
-    }
-
-    logSegments
   }
 
   /**
@@ -272,7 +275,8 @@ class Log(val dir: File,
         trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
                 .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages))
 
-        maybeFlush(appendInfo.shallowCount)
+        if(unflushedMessages >= config.flushInterval)
+          flush()
 
         appendInfo
       }
@@ -285,7 +289,6 @@ class Log(val dir: File,
    * @param firstOffset The first offset in the message set
    * @param lastOffset The last offset in the message set
    * @param codec The codec used in the message set
-   * @param count The number of messages
    * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
    */
   case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean)
@@ -452,11 +455,8 @@ class Log(val dir: File,
    * @return The newly rolled segment
    */
   def roll(): LogSegment = {
+    val start = time.nanoseconds
     lock synchronized {
-      // flush the log to ensure that only the active segment needs to be recovered
-      if(!segments.isEmpty())
-        flush()
-  
       val newOffset = logEndOffset
       val logFile = logFilename(dir, newOffset)
       val indexFile = indexFilename(dir, newOffset)
@@ -465,7 +465,6 @@ class Log(val dir: File,
         file.delete()
       }
     
-      info("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
       segments.lastEntry() match {
         case null => 
         case entry => entry.getValue.index.trimToValidSize()
@@ -477,33 +476,43 @@ class Log(val dir: File,
       val prev = addSegment(segment)
       if(prev != null)
         throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
+      
+      // schedule an asynchronous flush of the old segment
+      scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
+      
+      info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
+      
       segment
     }
   }
-
+  
   /**
-   * Flush the log if necessary
-   * @param numberOfMessages The number of messages that are being appended
+   * The number of messages appended to the log since the last flush
    */
-  private def maybeFlush(numberOfMessages : Int) {
-    if(unflushed.addAndGet(numberOfMessages) >= config.flushInterval)
-      flush()
-  }
+  def unflushedMessages() = this.logEndOffset - this.recoveryPoint
+  
+  /**
+   * Flush all log segments
+   */
+  def flush(): Unit = flush(this.logEndOffset)
 
   /**
-   * Flush this log file and associated index to the physical disk
+   * Flush log segments for all offsets up to offset-1
+   * @param offset The offset to flush up to (non-inclusive); the new recovery point
    */
-  def flush() : Unit = {
-    if (unflushed.get == 0)
+  def flush(offset: Long) : Unit = {
+    if (offset <= this.recoveryPoint)
       return
-
-    debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " +
-          time.milliseconds + " unflushed = " + unflushed.get)
+    debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
+          time.milliseconds + " unflushed = " + unflushedMessages)
+    for(segment <- logSegments(this.recoveryPoint, offset))
+      segment.flush()
     lock synchronized {
-      activeSegment.flush()
-      unflushed.set(0)
-      lastflushedTime.set(time.milliseconds)
-     }
+      if(offset > this.recoveryPoint) {
+        this.recoveryPoint = offset
+        lastflushedTime.set(time.milliseconds)
+      }
+    }
   }
 
   /**
@@ -534,6 +543,7 @@ class Log(val dir: File,
         deletable.foreach(deleteSegment(_))
         activeSegment.truncateTo(targetOffset)
         this.nextOffset.set(targetOffset)
+        this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
       }
       truncates.getAndIncrement
     }
@@ -553,6 +563,7 @@ class Log(val dir: File,
                                 indexIntervalBytes = config.indexInterval, 
                                 maxIndexSize = config.maxIndexSize))
       this.nextOffset.set(newOffset)
+      this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
       truncates.getAndIncrement
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 9002483..d039f9d 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,6 +23,7 @@ import kafka.utils._
 import scala.collection._
 import kafka.common.{TopicAndPartition, KafkaException}
 import kafka.server.KafkaConfig
+import kafka.server.OffsetCheckpoint
 
 
 /**
@@ -41,11 +42,12 @@ class LogManager(val logDirs: Array[File],
                  val defaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
                  val flushCheckMs: Long,
+                 val flushCheckpointMs: Long,
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
                  private val time: Time) extends Logging {
 
-  val CleanShutdownFile = ".kafka_cleanshutdown"
+  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000
   private val logCreationLock = new Object
@@ -53,6 +55,7 @@ class LogManager(val logDirs: Array[File],
   
   createAndValidateLogDirs(logDirs)
   private var dirLocks = lockLogDirs(logDirs)
+  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
   loadLogs(logDirs)
   
   private val cleaner: LogCleaner = 
@@ -102,10 +105,7 @@ class LogManager(val logDirs: Array[File],
    */
   private def loadLogs(dirs: Seq[File]) {
     for(dir <- dirs) {
-      /* check if this set of logs was shut down cleanly */
-      val cleanShutDownFile = new File(dir, CleanShutdownFile)
-      val needsRecovery = !cleanShutDownFile.exists
-      cleanShutDownFile.delete
+      val recoveryPoints = this.recoveryPointCheckpoints(dir).read
       /* load the logs */
       val subDirs = dir.listFiles()
       if(subDirs != null) {
@@ -116,7 +116,7 @@ class LogManager(val logDirs: Array[File],
             val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
             val log = new Log(dir, 
                               config,
-                              needsRecovery,
+                              recoveryPoints.getOrElse(topicPartition, 0L),
                               scheduler,
                               time)
             val previous = this.logs.put(topicPartition, log)
@@ -146,6 +146,11 @@ class LogManager(val logDirs: Array[File],
                          delay = InitialTaskDelayMs, 
                          period = flushCheckMs, 
                          TimeUnit.MILLISECONDS)
+      scheduler.schedule("kafka-recovery-point-checkpoint",
+                         checkpointRecoveryPointOffsets,
+                         delay = InitialTaskDelayMs,
+                         period = flushCheckpointMs,
+                         TimeUnit.MILLISECONDS)
     }
     if(cleanerConfig.enableCleaner)
       cleaner.startup()
@@ -160,10 +165,12 @@ class LogManager(val logDirs: Array[File],
       // stop the cleaner first
       if(cleaner != null)
         Utils.swallow(cleaner.shutdown())
+      // flush the logs to ensure latest possible recovery point
+      allLogs.foreach(_.flush())
       // close the logs
       allLogs.foreach(_.close())
-      // mark that the shutdown was clean by creating the clean shutdown marker file
-      logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
+      // update the last flush point
+      checkpointRecoveryPointOffsets()
     } finally {
       // regardless of whether the close succeeded, we need to unlock the data directories
       dirLocks.foreach(_.destroy())
@@ -172,6 +179,19 @@ class LogManager(val logDirs: Array[File],
   }
   
   /**
+   * Write out the current recovery point for all logs to a text file in the log directory 
+   * to avoid recovering the whole log on startup.
+   */
+  def checkpointRecoveryPointOffsets() {
+    val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString)
+    for(dir <- logDirs) {
+        val recoveryPoints = recoveryPointsByDir.get(dir.toString)
+        if(recoveryPoints.isDefined)
+          this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
+    }
+  }
+  
+  /**
    * Get the log if it exists, otherwise return None
    */
   def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
@@ -200,7 +220,7 @@ class LogManager(val logDirs: Array[File],
       dir.mkdirs()
       log = new Log(dir, 
                     config,
-                    needsRecovery = false,
+                    recoveryPoint = 0L,
                     scheduler,
                     time)
       logs.put(topicAndPartition, log)

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 38e6cd5..fe39d79 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -147,14 +147,17 @@ class LogSegment(val log: FileMessageSet,
   }
   
   /**
-   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
+   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
    * 
    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
    * is corrupt.
+   * 
+   * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
-  def recover(maxMessageSize: Int) {
+  def recover(maxMessageSize: Int): Int = {
     index.truncate()
+    index.resize(index.maxIndexSize)
     var validBytes = 0
     var lastIndexEntry = 0
     val iter = log.iterator(maxMessageSize)
@@ -181,9 +184,9 @@ class LogSegment(val log: FileMessageSet,
         logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
     }
     val truncated = log.sizeInBytes - validBytes
-    if(truncated > 0)
-      warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath))
     log.truncateTo(validBytes)
+    index.trimToValidSize()
+    truncated
   }
 
   override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index fbc728c..afab848 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -268,7 +268,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def resize(newSize: Int) {
     this synchronized {
-      flush()
       val raf = new RandomAccessFile(file, "rws")
       val roundedNewSize = roundToExactMultiple(newSize, 8)
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f7d8b03..ebbbdea 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -145,19 +145,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
-  val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue))
+  val logFlushIntervalMessages = props.getLongInRange("log.flush.interval.messages", Long.MaxValue, (1, Long.MaxValue))
 
   /* the amount of time to wait before deleting a file from the filesystem */
   val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
 
-  /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-  val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
-
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
-  val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms",  3000)
+  val logFlushSchedulerIntervalMs = props.getLong("log.flush.scheduler.interval.ms",  Long.MaxValue)
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
-  val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs)
+  val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs)
+  
+  /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */
+  val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue))
 
   /* enable auto creation of topic on the server */
   val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
@@ -223,4 +223,4 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   
   /* the maximum size for a metadata entry associated with an offset commit */
   val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024)
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2176958..a925ae1 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -277,6 +277,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                    defaultConfig = defaultLogConfig,
                    cleanerConfig = cleanerConfig,
                    flushCheckMs = config.logFlushSchedulerIntervalMs,
+                   flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                    retentionCheckMs = config.logCleanupIntervalMs,
                    scheduler = kafkaScheduler,
                    time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index c6e7a57..8fcd068 100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -37,7 +37,7 @@ object StressTestLog {
                       config = LogConfig(segmentSize = 64*1024*1024,
                                          maxMessageSize = Int.MaxValue,
                                          maxIndexSize = 1024*1024),
-                      needsRecovery = false,
+                      recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)
     val writer = new WriterThread(log)

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 36d52e7..eeb8c88 100644
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -20,9 +20,16 @@ package kafka
 import java.io._
 import java.nio._
 import java.nio.channels._
+import java.util.Random
+import kafka.log._
+import kafka.utils._
+import kafka.message._
 import scala.math._
 import joptsimple._
 
+/**
+ * This test does linear writes using either a kafka log or a file and measures throughput and latency.
+ */
 object TestLinearWriteSpeed {
 
   def main(args: Array[String]): Unit = {
@@ -32,7 +39,7 @@ object TestLinearWriteSpeed {
                            .describedAs("path")
                            .ofType(classOf[java.lang.String])
                            .defaultsTo(System.getProperty("java.io.tmpdir"))
-    val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.")
+    val bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
                            .withRequiredArg
                            .describedAs("num_bytes")
                            .ofType(classOf[java.lang.Long])
@@ -40,7 +47,12 @@ object TestLinearWriteSpeed {
                            .withRequiredArg
                            .describedAs("num_bytes")
                            .ofType(classOf[java.lang.Integer])
-    val filesOpt = parser.accepts("files", "REQUIRED: The number of files.")
+    val messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.")
+                           .withRequiredArg
+                           .describedAs("num_bytes")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024)
+    val filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.")
                            .withRequiredArg
                            .describedAs("num_files")
                            .ofType(classOf[java.lang.Integer])
@@ -55,7 +67,19 @@ object TestLinearWriteSpeed {
                            .describedAs("mb")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(Integer.MAX_VALUE)
-   val mmapOpt = parser.accepts("mmap", "Mmap file.")
+   val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
+                           .withRequiredArg()
+                           .describedAs("message_count")
+                           .ofType(classOf[java.lang.Long])
+                           .defaultsTo(Long.MaxValue)
+   val compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
+                            .withRequiredArg
+                            .describedAs("codec")
+                            .ofType(classOf[java.lang.String])
+                            .defaultsTo(NoCompressionCodec.name)
+   val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.")
+   val channelOpt = parser.accepts("channel", "Do writes to file channesl.")
+   val logOpt = parser.accepts("log", "Do writes to kafka logs.")
                           
     val options = parser.parse(args : _*)
     
@@ -68,26 +92,35 @@ object TestLinearWriteSpeed {
     }
 
     var bytesToWrite = options.valueOf(bytesOpt).longValue
-    val mmap = options.has(mmapOpt)
     val bufferSize = options.valueOf(sizeOpt).intValue
     val numFiles = options.valueOf(filesOpt).intValue
     val reportingInterval = options.valueOf(reportingIntervalOpt).longValue
     val dir = options.valueOf(dirOpt)
     val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L
     val buffer = ByteBuffer.allocate(bufferSize)
-    while(buffer.hasRemaining)
-      buffer.put(123.asInstanceOf[Byte])
+    val messageSize = options.valueOf(messageSizeOpt).intValue
+    val flushInterval = options.valueOf(flushIntervalOpt).longValue
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt))
+    val rand = new Random
+    rand.nextBytes(buffer.array)
+    val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
+    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*)
     
     val writables = new Array[Writable](numFiles)
+    val scheduler = new KafkaScheduler(1)
+    scheduler.startup()
     for(i <- 0 until numFiles) {
-      val file = new File(dir, "kafka-test-" + i + ".dat")
-      file.deleteOnExit()
-      val raf = new RandomAccessFile(file, "rw")
-      raf.setLength(bytesToWrite / numFiles)
-      if(mmap)
-        writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length()))
-      else
-        writables(i) = new ChannelWritable(raf.getChannel())
+      if(options.has(mmapOpt)) {
+        writables(i) = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer)
+      } else if(options.has(channelOpt)) {
+        writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer)
+      } else if(options.has(logOpt)) {
+        val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect
+        writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize, flushInterval = flushInterval), scheduler, messageSet)
+      } else {
+        System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") 
+        System.exit(1)
+      }
     }
     bytesToWrite = (bytesToWrite / numFiles) * numFiles
     
@@ -101,15 +134,14 @@ object TestLinearWriteSpeed {
     var totalWritten = 0L
     var lastReport = beginTest
     while(totalWritten + bufferSize < bytesToWrite) {
-      buffer.rewind()
       val start = System.nanoTime
-      writables((count % numFiles).toInt.abs).write(buffer)
+      val writeSize = writables((count % numFiles).toInt.abs).write()
       val ellapsed = System.nanoTime - start
       maxLatency = max(ellapsed, maxLatency)
       totalLatency += ellapsed
-      written += bufferSize
+      written += writeSize
       count += 1
-      totalWritten += bufferSize
+      totalWritten += writeSize
       if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) {
         val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0)
         val mb = written / (1024.0*1024.0)
@@ -118,7 +150,7 @@ object TestLinearWriteSpeed {
         written = 0
         maxLatency = 0L
         totalLatency = 0L
-      } else if(written > maxThroughputBytes) {
+      } else if(written > maxThroughputBytes * (reportingInterval / 1000.0)) {
         // if we have written enough, just sit out this reporting interval
         val lastReportMs = lastReport / (1000*1000)
         val now = System.nanoTime / (1000*1000)
@@ -129,21 +161,53 @@ object TestLinearWriteSpeed {
     }
     val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0)
     println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec")
+    scheduler.shutdown()
   }
   
   trait Writable {
-    def write(buffer: ByteBuffer)
+    def write(): Int
+    def close()
   }
   
-  class MmapWritable(val buffer: ByteBuffer) extends Writable {
-    def write(b: ByteBuffer) {
-      buffer.put(b)
+  class MmapWritable(val file: File, size: Long, val content: ByteBuffer) extends Writable {
+    file.deleteOnExit()
+    val raf = new RandomAccessFile(file, "rw")
+    raf.setLength(size)
+    val buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length())
+    def write(): Int = {
+      buffer.put(content)
+      content.rewind()
+      content.limit
+    }
+    def close() {
+      raf.close()
     }
   }
   
-  class ChannelWritable(val channel: FileChannel) extends Writable {
-    def write(b: ByteBuffer) {
-      channel.write(b)
+  class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable {
+    file.deleteOnExit()
+    val raf = new RandomAccessFile(file, "rw")
+    val channel = raf.getChannel
+    def write(): Int = {
+      channel.write(content)
+      content.rewind()
+      content.limit
+    }
+    def close() {
+      raf.close()
+    }
+  }
+  
+  class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable {
+    Utils.rm(dir)
+    val log = new Log(dir, config, 0L, scheduler, SystemTime)
+    def write(): Int = {
+      log.append(messages, true)
+      messages.sizeInBytes
+    }
+    def close() {
+      log.close()
+      Utils.rm(log.dir)
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
deleted file mode 100644
index d91011e..0000000
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.message._
-import kafka.utils.{SystemTime, TestUtils, Utils, KafkaScheduler}
-import kafka.server.KafkaConfig
-
-object TestLogPerformance {
-
-  def main(args: Array[String]): Unit = {
-    if(args.length < 4)
-      Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec")
-    val numMessages = args(0).toInt
-    val messageSize = args(1).toInt
-    val batchSize = args(2).toInt
-    val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
-    val props = TestUtils.createBrokerConfig(0, -1)
-    val config = new KafkaConfig(props)
-    val dir = TestUtils.tempDir()
-    val scheduler = new KafkaScheduler(1)
-    val logConfig = LogConfig()
-    val log = new Log(dir, logConfig, needsRecovery = false, scheduler = scheduler, time = SystemTime)
-    val bytes = new Array[Byte](messageSize)
-    new java.util.Random().nextBytes(bytes)
-    val message = new Message(bytes)
-    val messages = new Array[Message](batchSize)
-    for(i <- 0 until batchSize)
-      messages(i) = message
-    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages: _*)
-    val numBatches = numMessages / batchSize
-    val start = System.currentTimeMillis()
-    for(i <- 0 until numBatches)
-      log.append(messageSet)
-    log.close()
-    val elapsed = (System.currentTimeMillis() - start) / 1000.0
-    val writtenBytes = MessageSet.entrySize(message) * numMessages
-    println("message size = " + MessageSet.entrySize(message))
-    println("MB/sec: " + writtenBytes / elapsed / (1024.0 * 1024.0))
-    Utils.rm(dir)
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 4619d86..5a312bf 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -195,7 +195,7 @@ class CleanerTest extends JUnitSuite {
   }
   
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
   
   def makeCleaner(capacity: Int) = 
     new Cleaner(id = 0, 

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 15e9b60..1de3ef0 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -102,7 +102,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
       dir.mkdirs()
       val log = new Log(dir = dir,
                         LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true),
-                        needsRecovery = false,
+                        recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)
       logs.put(TopicAndPartition("log", i), log)      

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 6916df4..b4bee33 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.server.KafkaConfig
 import kafka.common._
 import kafka.utils._
+import kafka.server.OffsetCheckpoint
 
 class LogManagerTest extends JUnit3Suite {
 
@@ -40,7 +41,15 @@ class LogManagerTest extends JUnit3Suite {
   override def setUp() {
     super.setUp()
     logDir = TestUtils.tempDir()
-    logManager = new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(logDirs = Array(logDir), 
+                                topicConfigs = Map(), 
+                                defaultConfig = logConfig, 
+                                cleanerConfig = cleanerConfig, 
+                                flushCheckMs = 1000L, 
+                                flushCheckpointMs = 100000L, 
+                                retentionCheckMs = 1000L, 
+                                scheduler = time.scheduler, 
+                                time = time)
     logManager.startup
     logDir = logManager.logDirs(0)
   }
@@ -116,7 +125,7 @@ class LogManagerTest extends JUnit3Suite {
     logManager.shutdown()
 
     val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
-    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
     logManager.startup
 
     // create a log
@@ -156,7 +165,7 @@ class LogManagerTest extends JUnit3Suite {
   def testTimeBasedFlush() {
     logManager.shutdown()
     val config = logConfig.copy(flushMs = 1000)
-    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
@@ -178,7 +187,7 @@ class LogManagerTest extends JUnit3Suite {
                      TestUtils.tempDir(), 
                      TestUtils.tempDir())
     logManager.shutdown()
-    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
     
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -194,10 +203,30 @@ class LogManagerTest extends JUnit3Suite {
    */
   def testTwoLogManagersUsingSameDirFails() {
     try {
-      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
+      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
       case e: KafkaException => // this is good 
     }
   }
+  
+  /**
+   * Test that recovery points are correctly written out to disk
+   */
+  def testCheckpointRecoveryPoints() {
+    val topicA = TopicAndPartition("test-a", 1)
+    val topicB = TopicAndPartition("test-b", 1)
+    val logA = this.logManager.createLog(topicA, logConfig)
+    val logB = this.logManager.createLog(topicB, logConfig)
+    for(i <- 0 until 50) 
+      logA.append(TestUtils.singleMessageSet("test".getBytes()))
+    for(i <- 0 until 100)
+      logB.append(TestUtils.singleMessageSet("test".getBytes()))
+    logA.flush()
+    logB.flush()
+    logManager.checkpointRecoveryPointOffsets()
+    val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
+    assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint)
+    assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 3a4b41b..5f2c2e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -201,7 +201,7 @@ class LogSegmentTest extends JUnit3Suite {
     for(i <- 0 until 100)
       seg.append(i, messages(i, i.toString))
     val indexFile = seg.index.file
-    writeNonsense(indexFile, 5, indexFile.length.toInt)
+    TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
       assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset)
@@ -221,20 +221,11 @@ class LogSegmentTest extends JUnit3Suite {
       val offsetToBeginCorruption = rand.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
       val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
-      writeNonsense(seg.log.file, position, seg.log.file.length.toInt - position)
+      TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
       seg.recover(64*1024)
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
       seg.delete()
     }
   }
   
-  def writeNonsense(fileName: File, position: Long, size: Int) {
-    val file = new RandomAccessFile(fileName, "rw")
-    file.seek(position)
-    val rand = new Random
-    for(i <- 0 until size)
-      file.writeByte(rand.nextInt(255))
-    file.close()
-  }
-  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7d41938..b7f43e2 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -67,7 +67,7 @@ class LogTest extends JUnitSuite {
     // create a log
     val log = new Log(logDir, 
                       logConfig.copy(segmentMs = 1 * 60 * 60L), 
-                      needsRecovery = false, 
+                      recoveryPoint = 0L, 
                       scheduler = time.scheduler, 
                       time = time)
     time.sleep(log.config.segmentMs + 1)
@@ -102,7 +102,7 @@ class LogTest extends JUnitSuite {
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -118,7 +118,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
     log.append(TestUtils.singleMessageSet("test".getBytes))
   }
 
@@ -127,7 +127,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithSequentialOffsets() {
-    val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
     
     for(i <- 0 until messages.length)
@@ -146,7 +146,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
-    val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val messages = messageIds.map(id => new Message(id.toString.getBytes))
     
@@ -169,7 +169,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReadAtLogGap() {
-    val log = new Log(logDir, logConfig.copy(segmentSize = 300), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time)
     
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -189,7 +189,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, logConfig.copy(segmentSize = 1024), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -212,7 +212,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
     messageSets.foreach(log.append(_))
@@ -228,6 +228,11 @@ class LogTest extends JUnitSuite {
     }
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
     assertEquals("Should be no more messages", 0, lastRead.size)
+    
+    // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
+    TestUtils.retry(1000L){
+      assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset)
+    }
   }
   
   /**
@@ -236,7 +241,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
-    val log = new Log(logDir, logConfig.copy(segmentSize = 10), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time)
     
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@@ -259,7 +264,7 @@ class LogTest extends JUnitSuite {
     for(messagesToAppend <- List(0, 1, 25)) {
       logDir.mkdirs()
       // first test a log segment starting at 0
-      val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
+      val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singleMessageSet(i.toString.getBytes))
       
@@ -293,7 +298,7 @@ class LogTest extends JUnitSuite {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), needsRecovery = false, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -316,22 +321,23 @@ class LogTest extends JUnitSuite {
     val segmentSize = 7 * messageSize
     val indexInterval = 3 * messageSize
     val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096)
-    var log = new Log(logDir, config, needsRecovery = false, time.scheduler, time)
+    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
     val lastIndexOffset = log.activeSegment.index.lastOffset
     val numIndexEntries = log.activeSegment.index.entries
+    val lastOffset = log.logEndOffset
     log.close()
     
-    log = new Log(logDir, config, needsRecovery = false, time.scheduler, time)
+    log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
     log.close()
     
     // test recovery case
-    log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
+    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
@@ -346,7 +352,7 @@ class LogTest extends JUnitSuite {
     // publish the messages and close the log
     val numMessages = 200
     val config = logConfig.copy(segmentSize = 200, indexInterval = 1)
-    var log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
+    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -356,7 +362,7 @@ class LogTest extends JUnitSuite {
     indexFiles.foreach(_.delete())
     
     // reopen the log
-    log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)    
+    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)    
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages)
       assertEquals(i, log.read(i, 100, None).head.offset)
@@ -374,7 +380,7 @@ class LogTest extends JUnitSuite {
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -427,7 +433,7 @@ class LogTest extends JUnitSuite {
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
     val config = logConfig.copy(segmentSize = segmentSize)
-    val log = new Log(logDir, config, needsRecovery = false, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
     for (i<- 1 to msgPerSeg)
       log.append(set)
@@ -457,7 +463,7 @@ class LogTest extends JUnitSuite {
                       logConfig.copy(segmentSize = set.sizeInBytes * 5, 
                                      maxIndexSize = 1000, 
                                      indexInterval = 1),
-                      needsRecovery = false,
+                      recoveryPoint = 0L,
                       time.scheduler,
                       time)
     
@@ -484,7 +490,7 @@ class LogTest extends JUnitSuite {
     // create a log
     var log = new Log(logDir, 
                       config,
-                      needsRecovery = true,
+                      recoveryPoint = 0L,
                       time.scheduler,
                       time)
     
@@ -494,7 +500,7 @@ class LogTest extends JUnitSuite {
     log.close()
     log = new Log(logDir, 
                   config,
-                  needsRecovery = true,
+                  recoveryPoint = 0L,
                   time.scheduler,
                   time)
     log.truncateTo(3)
@@ -515,7 +521,7 @@ class LogTest extends JUnitSuite {
                                 indexInterval = 10000)
     val log = new Log(logDir,
                       config,
-                      needsRecovery = true,                      
+                      recoveryPoint = 0L,                      
                       time.scheduler,
                       time)
     
@@ -550,7 +556,7 @@ class LogTest extends JUnitSuite {
     val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000)
     var log = new Log(logDir,
                       config,
-                      needsRecovery = false,
+                      recoveryPoint = 0L,
                       time.scheduler,
                       time)
     
@@ -563,7 +569,7 @@ class LogTest extends JUnitSuite {
     
     log = new Log(logDir, 
                   config,
-                  needsRecovery = false,
+                  recoveryPoint = 0L,
                   time.scheduler,
                   time)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
@@ -573,7 +579,7 @@ class LogTest extends JUnitSuite {
   def testAppendMessageWithNullPayload() {
     var log = new Log(logDir,
                       LogConfig(),
-                      needsRecovery = false,
+                      recoveryPoint = 0L,
                       time.scheduler,
                       time)
     log.append(new ByteBufferMessageSet(new Message(bytes = null)))
@@ -582,4 +588,36 @@ class LogTest extends JUnitSuite {
     assertTrue("Message payload should be null.", ms.head.message.isNull)
   }
   
+  @Test
+  def testCorruptLog() {    
+    // append some messages to create some segments
+    val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val recoveryPoint = 50L
+    for(iteration <- 0 until 10) {
+      // create a log and write some messages to it
+      var log = new Log(logDir,
+                        config,
+                        recoveryPoint = 0L,
+                        time.scheduler,
+                        time)
+      for(i <- 0 until 100)
+        log.append(set)
+      val seg = log.logSegments(0, recoveryPoint).last
+      val index = seg.index
+      val messages = seg.log
+      val filePosition = messages.searchFor(recoveryPoint, 0).position
+      val indexPosition = index.lookup(recoveryPoint).position
+      log.close()
+      
+      // corrupt file
+      TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition)
+      TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition)
+      
+      // attempt recovery
+      log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
+      assertEquals(recoveryPoint, log.logEndOffset)
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 456e538..02c188a 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -37,6 +37,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
                                                          defaultConfig = LogConfig(),
                                                          cleanerConfig = CleanerConfig(),
                                                          flushCheckMs = 30000,
+                                                         flushCheckpointMs = 10000L,
                                                          retentionCheckMs = 30000,
                                                          scheduler = new KafkaScheduler(1),
                                                          time = new MockTime))

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 298ba71..20fe93e 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -55,11 +55,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     // send some messages
     producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
 
-    // do a clean shutdown and check that the clean shudown file is written out
+    // do a clean shutdown and check that offset checkpoint file exists
     server.shutdown()
     for(logDir <- config.logDirs) {
-      val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile)
-      assertTrue(cleanShutDownFile.exists)
+      val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile)
+      assertTrue(OffsetCheckpointFile.exists)
+      assertTrue(OffsetCheckpointFile.length() > 0)
     }
     producer.close()
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/df18fe13/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 148bb4b..10712e2 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -515,6 +515,15 @@ object TestUtils extends Logging {
         servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition))), timeout))
   }
   
+  def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
+    val file = new RandomAccessFile(fileName, "rw")
+    file.seek(position)
+    val rand = new Random
+    for(i <- 0 until size)
+      file.writeByte(rand.nextInt(255))
+    file.close()
+  }
+  
 }
 
 object TestZKUtils {


Mime
View raw message