kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6530: Use actual first offset of message set when rolling log segment (#4660)
Date Sat, 17 Mar 2018 17:29:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae31ee6  KAFKA-6530: Use actual first offset of message set when rolling log segment (#4660)
ae31ee6 is described below

commit ae31ee63dc3bdb9068543791bea44a12a97aa928
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Sat Mar 17 10:29:42 2018 -0700

    KAFKA-6530: Use actual first offset of message set when rolling log segment (#4660)
    
    Use the exact first offset of message set when rolling log segment. This is possible to do for message format V2 and beyond without any performance penalty, because we have the first offset stored in the header. This augments the fix made in KAFKA-4451 to avoid using the heuristic for V2 and beyond messages.
    
    Added unit tests to simulate cases where segment needs to roll because of overflow in index offsets. Verified that the new segment created in these cases uses the first offset, instead of the heuristic in use previously.
---
 .../apache/kafka/common/record/MemoryRecords.java  |   4 +
 core/src/main/scala/kafka/log/Log.scala            |  77 +++++++++-----
 core/src/main/scala/kafka/log/LogCleaner.scala     |   3 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |  12 +--
 .../main/scala/kafka/server/ReplicaManager.scala   |  10 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |  61 ++++++++---
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |   8 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  29 ++---
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   2 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  52 ++++-----
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 117 +++++++++++++++++----
 11 files changed, 255 insertions(+), 120 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 932d4b6..da6b68c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -510,6 +510,10 @@ public class MemoryRecords extends AbstractRecords {
                 records);
     }
 
+    public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType, SimpleRecord... records) {
+        return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
+    }
+
     public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Integer partitionLeaderEpoch, SimpleRecord... records) {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
                 RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records);
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f0050f5..cc69337 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -47,11 +47,11 @@ import java.lang.{Long => JLong}
 import java.util.regex.Pattern
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
+  val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
     RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
-    LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+    LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
       RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
@@ -59,7 +59,7 @@ object LogAppendInfo {
  * Struct to hold various quantities we compute about each message set before appending to the log
  *
  * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
- *                    to the follower. In that case, this will be the last offset for performance reasons.
+ *                    to the follower.
  * @param lastOffset The last offset in the message set
  * @param maxTimestamp The maximum timestamp of the message set.
  * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
@@ -72,7 +72,7 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
  */
-case class LogAppendInfo(var firstOffset: Long,
+case class LogAppendInfo(var firstOffset: Option[Long],
                          var lastOffset: Long,
                          var maxTimestamp: Long,
                          var offsetOfMaxTimestamp: Long,
@@ -83,7 +83,24 @@ case class LogAppendInfo(var firstOffset: Long,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean)
+                         offsetsMonotonic: Boolean) {
+  /**
+    * Get the first offset if it exists, else get the last offset.
+    * @return The offset of first message if it exists; else offset of the last message.
+    */
+  def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset)
+
+  /**
+    * Get the (maximum) number of messages described by LogAppendInfo
+    * @return Maximum possible number of messages described by LogAppendInfo
+    */
+  def numMessages: Long = {
+    firstOffset match {
+      case Some(firstOffsetVal) if (firstOffsetVal >= 0 && lastOffset >= 0) => (lastOffset - firstOffsetVal + 1)
+      case _ => 0
+    }
+  }
+}
 
 /**
  * A class used to hold useful metadata about a completed transaction. This is used to build
@@ -653,7 +670,7 @@ class Log(@volatile var dir: File,
         if (assignOffsets) {
           // assign offsets to the message set
           val offset = new LongRef(nextOffsetMetadata.messageOffset)
-          appendInfo.firstOffset = offset.value
+          appendInfo.firstOffset = Some(offset.value)
           val now = time.milliseconds
           val validateAndOffsetAssignResult = try {
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
@@ -695,7 +712,7 @@ class Log(@volatile var dir: File,
           }
         } else {
           // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
+          if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
         }
 
@@ -715,7 +732,7 @@ class Log(@volatile var dir: File,
         // validate the idempotent/transactional state of the producers and collect some metadata
         val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
         maybeDuplicate.foreach { duplicate =>
-          appendInfo.firstOffset = duplicate.firstOffset
+          appendInfo.firstOffset = Some(duplicate.firstOffset)
           appendInfo.lastOffset = duplicate.lastOffset
           appendInfo.logAppendTime = duplicate.timestamp
           appendInfo.logStartOffset = logStartOffset
@@ -723,17 +740,14 @@ class Log(@volatile var dir: File,
         }
 
         // maybe roll the log if this segment is full
-        val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
-          maxTimestampInMessages = appendInfo.maxTimestamp,
-          maxOffsetInMessages = appendInfo.lastOffset)
+        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
         val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOffset,
+          messageOffset = appendInfo.firstOrLastOffset,
           segmentBaseOffset = segment.baseOffset,
           relativePositionInSegment = segment.size)
 
-        segment.append(firstOffset = appendInfo.firstOffset,
-          largestOffset = appendInfo.lastOffset,
+        segment.append(largestOffset = appendInfo.lastOffset,
           largestTimestamp = appendInfo.maxTimestamp,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
@@ -761,8 +775,8 @@ class Log(@volatile var dir: File,
         // update the first unstable offset (which is used to compute LSO)
         updateFirstUnstableOffset()
 
-        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
-          .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
+        trace(s"Appended message set to log ${this.name} with last offset: ${appendInfo.lastOffset}, " +
+              s"first offset: ${appendInfo.firstOffset}, next offset: ${nextOffsetMetadata.messageOffset}, and messages: $validRecords")
 
         if (unflushedMessages >= config.flushInterval)
           flush()
@@ -859,12 +873,13 @@ class Log(@volatile var dir: File,
   private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
-    var firstOffset = -1L
+    var firstOffset: Option[Long] = None
     var lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
+    var readFirstMessage = false
 
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility issues with older clients
@@ -876,8 +891,12 @@ class Log(@volatile var dir: File,
       // For magic version 2, we can get the first offset directly from the batch header.
       // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
       // case, validation will be more lenient.
-      if (firstOffset < 0)
-        firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) batch.baseOffset else batch.lastOffset
+      // Also indicate whether we have the accurate first offset or not
+      if (!readFirstMessage) {
+        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          firstOffset = Some(batch.baseOffset)
+        readFirstMessage = true
+      }
 
       // check that offsets are monotonically increasing
       if (lastOffset >= batch.lastOffset)
@@ -1268,8 +1287,8 @@ class Log(@volatile var dir: File,
   /**
    * Roll the log over to a new empty log segment if necessary.
    *
-   * @param messagesSize The messages set size in bytes
-   * @param maxTimestampInMessages The maximum timestamp in the messages.
+   * @param messagesSize The messages set size in bytes.
+   * @param appendInfo log append information
    * logSegment will be rolled if one of the following conditions met
    * <ol>
    * <li> The logSegment is full
@@ -1279,14 +1298,19 @@ class Log(@volatile var dir: File,
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new segment
    */
-  private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
+  private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = {
     val segment = activeSegment
     val now = time.milliseconds
+
+    val maxTimestampInMessages = appendInfo.maxTimestamp
+    val maxOffsetInMessages = appendInfo.lastOffset
+
     if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
       debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
           s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
           s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
           s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
+
       /*
         maxOffsetInMessages - Integer.MAX_VALUE is a heuristic value for the first offset in the set of messages.
         Since the offset in messages will not differ by more than Integer.MAX_VALUE, this is guaranteed <= the real
@@ -1296,8 +1320,13 @@ class Log(@volatile var dir: File,
         Integer.MAX_VALUE.toLong + 2 or more.  In this case, the prior behavior would roll a new log segment whose
         base offset was too low to contain the next message.  This edge case is possible when a replica is recovering a
         highly compacted topic from scratch.
-       */
-      roll(maxOffsetInMessages - Integer.MAX_VALUE)
+        Note that this is only required for pre-V2 message formats because these do not store the first message offset
+        in the header.
+      */
+      appendInfo.firstOffset match {
+        case Some(firstOffset) => roll(firstOffset)
+        case None => roll(maxOffsetInMessages - Integer.MAX_VALUE)
+      }
     } else {
       segment
     }
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 07e8440..0ee9942 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -614,8 +614,7 @@ private[log] class Cleaner(val id: Int,
         val retained = MemoryRecords.readableRecords(outputBuffer)
         // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
         // after `Log.replaceSegments` (which acquires the lock) is called
-        dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
-          largestOffset = result.maxOffset,
+        dest.append(largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
           records = retained)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5970f42..5130b28 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -112,7 +112,6 @@ class LogSegment private[log] (val log: FileRecords,
    *
    * It is assumed this method is being called from within a lock.
    *
-   * @param firstOffset The first offset in the message set.
    * @param largestOffset The last offset in the message set
    * @param largestTimestamp The largest timestamp in the message set.
    * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
@@ -120,21 +119,20 @@ class LogSegment private[log] (val log: FileRecords,
    * @return the physical position in the file of the appended records
    */
   @nonthreadsafe
-  def append(firstOffset: Long,
-             largestOffset: Long,
+  def append(largestOffset: Long,
              largestTimestamp: Long,
              shallowOffsetOfMaxTimestamp: Long,
              records: MemoryRecords): Unit = {
     if (records.sizeInBytes > 0) {
-      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
-          .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
+      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
+            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
       val physicalPosition = log.sizeInBytes()
       if (physicalPosition == 0)
         rollingBasedTimestamp = Some(largestTimestamp)
       // append the messages
       require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
       val appendedBytes = log.append(records)
-      trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
+      trace(s"Appended $appendedBytes to ${log.file()} at end offset $largestOffset")
       // Update the in memory max timestamp and corresponding offset.
       if (largestTimestamp > maxTimestampSoFar) {
         maxTimestampSoFar = largestTimestamp
@@ -142,7 +140,7 @@ class LogSegment private[log] (val log: FileRecords,
       }
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        offsetIndex.append(firstOffset, physicalPosition)
+        offsetIndex.append(largestOffset, physicalPosition)
         timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
         bytesSinceLastIndexEntry = 0
       }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 470842e..d4abc11 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -474,7 +474,7 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status
+                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
       }
 
       processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
@@ -502,7 +502,7 @@ class ReplicaManager(val config: KafkaConfig,
       // Just return an error and don't handle the request at all
       val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
         topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
-          LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
       }
       responseCallback(responseStatus)
     }
@@ -747,11 +747,7 @@ class ReplicaManager(val config: KafkaConfig,
               .format(topicPartition, localBrokerId))
           }
 
-          val numAppendedMessages =
-            if (info.firstOffset == -1L || info.lastOffset == -1L)
-              0
-            else
-              info.lastOffset - info.firstOffset + 1
+          val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
           brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 1710da7..54f8582 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -68,47 +68,76 @@ object StressTestLog {
     })
 
     while(running.get) {
-      println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
       Thread.sleep(1000)
+      println("Reader offset = %d, writer offset = %d".format(reader.currentOffset, writer.currentOffset))
+      writer.checkProgress()
+      reader.checkProgress()
     }
   }
 
   abstract class WorkerThread extends Thread {
+    val threadInfo = "Thread: " + Thread.currentThread.getName + " Class: " + getClass.getName
+
     override def run() {
       try {
         while(running.get)
           work()
       } catch {
-        case e: Exception =>
+        case e: Exception => {
           e.printStackTrace()
-          running.set(false)
+        }
+      } finally {
+        running.set(false)
       }
-      println(getClass.getName + " exiting...")
     }
+
     def work()
+    def isMakingProgress(): Boolean
+  }
+
+  trait LogProgress {
+    @volatile var currentOffset = 0
+    private var lastOffsetCheckpointed = currentOffset
+    private var lastProgressCheckTime = System.currentTimeMillis
+
+    def isMakingProgress(): Boolean = {
+      if (currentOffset > lastOffsetCheckpointed) {
+        lastOffsetCheckpointed = currentOffset
+        return true
+      }
+
+      false
+    }
+
+    def checkProgress() {
+      // Check if we are making progress every 500ms
+      val curTime = System.currentTimeMillis
+      if ((curTime - lastProgressCheckTime) > 500) {
+        require(isMakingProgress(), "Thread not making progress")
+        lastProgressCheckTime = curTime
+      }
+    }
   }
 
-  class WriterThread(val log: Log) extends WorkerThread {
-    @volatile var offset = 0
+  class WriterThread(val log: Log) extends WorkerThread with LogProgress {
     override def work() {
-      val logAppendInfo = log.appendAsFollower(TestUtils.singletonRecords(offset.toString.getBytes))
-      require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset)
-      offset += 1
-      if(offset % 1000 == 0)
-        Thread.sleep(500)
+      val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0)
+      require(logAppendInfo.firstOffset.forall(_ == currentOffset) && logAppendInfo.lastOffset == currentOffset)
+      currentOffset += 1
+      if (currentOffset % 1000 == 0)
+        Thread.sleep(50)
     }
   }
 
-  class ReaderThread(val log: Log) extends WorkerThread {
-    @volatile var offset = 0
+  class ReaderThread(val log: Log) extends WorkerThread with LogProgress {
     override def work() {
       try {
-        log.read(offset, 1024, Some(offset+1), isolationLevel = IsolationLevel.READ_UNCOMMITTED).records match {
+        log.read(currentOffset, 1024, Some(currentOffset + 1), isolationLevel = IsolationLevel.READ_UNCOMMITTED).records match {
           case read: FileRecords if read.sizeInBytes > 0 => {
             val first = read.batches.iterator.next()
-            require(first.lastOffset == offset, "We should either read nothing or the message we asked for.")
+            require(first.lastOffset == currentOffset, "We should either read nothing or the message we asked for.")
             require(first.sizeInBytes == read.sizeInBytes, "Expected %d but got %d.".format(first.sizeInBytes, read.sizeInBytes))
-            offset += 1
+            currentOffset += 1
           }
           case _ =>
         }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 152d6d3..4f5ba5c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -69,7 +69,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
     checkLogAfterAppendingDups(log, startSize, appends)
 
     val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-    val largeMessageOffset = appendInfo.firstOffset
+    val largeMessageOffset = appendInfo.firstOffset.get
 
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
     val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
@@ -176,7 +176,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
     val appends2: Seq[(Int, String, Long)] = {
       val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
       val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-      val largeMessageOffset = appendInfo.firstOffset
+      val largeMessageOffset = appendInfo.firstOffset.get
 
       // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
       props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
@@ -314,7 +314,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
       val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
               key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
       counter += 1
-      (key, value, appendInfo.firstOffset)
+      (key, value, appendInfo.firstOffset.get)
     }
   }
 
@@ -331,7 +331,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
     }
 
     val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
-    val offsets = appendInfo.firstOffset to appendInfo.lastOffset
+    val offsets = appendInfo.firstOffset.get to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c12f617..906c26d 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -147,17 +147,17 @@ class LogCleanerTest extends JUnitSuite {
 
     // check duplicate append from producer 1
     var logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
-    assertEquals(0L, logAppendInfo.firstOffset)
+    assertEquals(0L, logAppendInfo.firstOffset.get)
     assertEquals(2L, logAppendInfo.lastOffset)
 
     // check duplicate append from producer 3
     logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
-    assertEquals(6L, logAppendInfo.firstOffset)
+    assertEquals(6L, logAppendInfo.firstOffset.get)
     assertEquals(7L, logAppendInfo.lastOffset)
 
     // check duplicate append from producer 2
     logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
-    assertEquals(3L, logAppendInfo.firstOffset)
+    assertEquals(3L, logAppendInfo.firstOffset.get)
     assertEquals(5L, logAppendInfo.lastOffset)
 
     // do one more append and a round of cleaning to force another deletion from producer 1's batch
@@ -173,7 +173,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // duplicate append from producer1 should still be fine
     logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
-    assertEquals(0L, logAppendInfo.firstOffset)
+    assertEquals(0L, logAppendInfo.firstOffset.get)
     assertEquals(2L, logAppendInfo.lastOffset)
   }
 
@@ -1082,16 +1082,17 @@ class LogCleanerTest extends JUnitSuite {
     val logConfig = LogConfig(logProps)
     val log = makeLog(config = logConfig)
     val cleaner = makeCleaner(Int.MaxValue)
-    val start = 0
-    val end = 2
-    val offsetSeq = Seq(0L, 7206178L)
-    writeToLog(log, (start until end) zip (start until end), offsetSeq)
-    cleaner.buildOffsetMap(log, start, end, map, new CleanerStats())
-    val endOffset = map.latestOffset
-    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
-    assertEquals("Should have the expected number of messages in the map.", end - start, map.size)
+    val keyStart = 0
+    val keyEnd = 2
+    val offsetStart = 0L
+    val offsetEnd = 7206178L
+    val offsetSeq = Seq(offsetStart, offsetEnd)
+    writeToLog(log, (keyStart until keyEnd) zip (keyStart until keyEnd), offsetSeq)
+    cleaner.buildOffsetMap(log, keyStart, offsetEnd + 1L, map, new CleanerStats())
+    assertEquals("Last offset should be the end offset.", offsetEnd, map.latestOffset)
+    assertEquals("Should have the expected number of messages in the map.", keyEnd - keyStart, map.size)
     assertEquals("Map should contain first value", 0L, map.get(key(0)))
-    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+    assertEquals("Map should contain second value", offsetEnd, map.get(key(1)))
   }
 
   /**
@@ -1265,7 +1266,7 @@ class LogCleanerTest extends JUnitSuite {
                 checkDone = checkDone)
 
   private def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
-    for ((key, value) <- seq) yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
+    for ((key, value) <- seq) yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset.get
   }
 
   private def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8a04914..2fdda6b 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -142,7 +142,7 @@ class LogManagerTest {
     for (_ <- 0 until numMessages) {
       val set = TestUtils.singletonRecords("test".getBytes())
       val info = log.appendAsLeader(set, leaderEpoch = 0)
-      offset = info.firstOffset
+      offset = info.firstOffset.get
     }
 
     log.onHighWatermarkIncremented(log.logEndOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c45ed0d..31bcf9c 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -85,7 +85,7 @@ class LogSegmentTest {
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there", "little", "bee")
-    seg.append(50, 53, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
     checkEquals(ms.records.iterator, read.records.iterator)
   }
@@ -99,7 +99,7 @@ class LogSegmentTest {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
     val ms = records(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, 52, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, ms)
     def validate(offset: Long) =
       assertEquals(ms.records.asScala.filter(_.offset == offset).toList,
                    seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.records.asScala.toList)
@@ -115,7 +115,7 @@ class LogSegmentTest {
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
@@ -128,9 +128,9 @@ class LogSegmentTest {
   def testReadFromGap() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
@@ -145,9 +145,9 @@ class LogSegmentTest {
     var offset = 40
     for (_ <- 0 until 30) {
       val ms1 = records(offset, "hello")
-      seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
+      seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
       val ms2 = records(offset + 1, "hello")
-      seg.append(offset + 1, offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+      seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
       assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList)
@@ -207,7 +207,7 @@ class LogSegmentTest {
     val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
     var offset = 40
     for (_ <- 0 until numMessages) {
-      seg.append(offset, offset, offset, offset, records(offset, "hello"))
+      seg.append(offset, offset, offset, records(offset, "hello"))
       offset += 1
     }
     assertEquals(offset, seg.readNextOffset)
@@ -229,7 +229,7 @@ class LogSegmentTest {
     // test the case where we fully truncate the log
     val time = new MockTime
     val seg = createSegment(40, time = time)
-    seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+    seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
 
     // If the segment is empty after truncation, the create time should be reset
     time.sleep(500)
@@ -241,7 +241,7 @@ class LogSegmentTest {
     assertFalse(seg.offsetIndex.isFull)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
 
-    seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+    seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
   }
 
   /**
@@ -253,7 +253,7 @@ class LogSegmentTest {
     val seg = createSegment(40, messageSize * 2 - 1)
     // Produce some messages
     for (i <- 40 until 50)
-      seg.append(i, i, i * 10, i, records(i, s"msg$i"))
+      seg.append(i, i * 10, i, records(i, s"msg$i"))
 
     assertEquals(490, seg.largestTimestamp)
     // Search for an indexed timestamp
@@ -277,7 +277,7 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.readNextOffset)
-    seg.append(50, 52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
+    seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
     assertEquals(53, seg.readNextOffset)
   }
 
@@ -304,7 +304,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
+      seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.offsetIndex.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(new ProducerStateManager(topicPartition, logDir))
@@ -323,26 +323,26 @@ class LogSegmentTest {
     val pid2 = 10L
 
     // append transactional records from pid1
-    segment.append(firstOffset = 100L, largestOffset = 101L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 101L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 100L, MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
         pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
     // append transactional records from pid2
-    segment.append(firstOffset = 102L, largestOffset = 103L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 103L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 102L, MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
         pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
     // append non-transactional records
-    segment.append(firstOffset = 104L, largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, CompressionType.NONE,
         partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
     // abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed)
-    segment.append(firstOffset = 106L, largestOffset = 106L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 106L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
 
     // commit the transaction from pid1
-    segment.append(firstOffset = 107L, largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+    segment.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
 
     var stateManager = new ProducerStateManager(topicPartition, logDir)
@@ -393,7 +393,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptTimeIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i, i * 10, i, records(i, i.toString))
+      seg.append(i, i * 10, i, records(i, i.toString))
     val timeIndexFile = seg.timeIndex.file
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
     seg.recover(new ProducerStateManager(topicPartition, logDir))
@@ -413,7 +413,7 @@ class LogSegmentTest {
     for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for (i <- 0 until messagesAppended)
-        seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
+        seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
 
@@ -445,9 +445,9 @@ class LogSegmentTest {
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
@@ -466,9 +466,9 @@ class LogSegmentTest {
       initFileSize = 512 * 1024 * 1024, preallocate = true)
 
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
     val oldSize = seg.log.sizeInBytes()
@@ -504,9 +504,9 @@ class LogSegmentTest {
 
     //Given two messages with a gap between them (e.g. mid offset compacted away)
     val ms1 = records(offset, "first message")
-    seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
+    seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
     val ms2 = records(offset + 3, "message after gap")
-    seg.append(offset + 3, offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
+    seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
 
     // When we truncate to an offset without a corresponding log entry
     seg.truncateTo(offset + 1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6753939..ec74815 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -897,12 +897,12 @@ class LogTest {
       new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
     ), producerId = pid, producerEpoch = epoch, sequence = seq)
     val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
-    assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
+    assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset.get + 1, 3)
 
     // Append a Duplicate of the tail, when the entry at the tail has multiple records.
     val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
-      multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
+      multiEntryAppendInfo.firstOffset.get, dupMultiEntryAppendInfo.firstOffset.get)
     assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
       multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
 
@@ -945,7 +945,7 @@ class LogTest {
       producerId = pid, producerEpoch = epoch, sequence = seq)
     val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
     val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
-    assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+    assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset.get, newAppendInfo.firstOffset.get)
     assertEquals("Inserted a duplicate records into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
   }
 
@@ -1387,7 +1387,7 @@ class LogTest {
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
-                   log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset)
+                   log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset.get)
 
       // cleanup the log
       log.delete()
@@ -1919,17 +1919,96 @@ class LogTest {
     //Writes into an empty log with baseOffset 0
     log.appendAsFollower(set1)
     assertEquals(0L, log.activeSegment.baseOffset)
-    //This write will roll the segment, yielding a new segment with base offset = max(2, 1) = 2
+    //This write will roll the segment, yielding a new segment with base offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2
     log.appendAsFollower(set2)
-    assertEquals(2L, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, 2L).exists)
-    //This will also roll the segment, yielding a new segment with base offset = max(3, Integer.MAX_VALUE+3) = Integer.MAX_VALUE+3
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists)
+    //This will go into the existing log
+    log.appendAsFollower(set3)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    //This will go into the existing log
+    log.appendAsFollower(set4)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    log.close()
+    val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index"))
+    assertEquals(2, indexFiles.length)
+    for (file <- indexFiles) {
+      val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong)
+      assertTrue(offsetIndex.lastOffset >= 0)
+      offsetIndex.close()
+    }
+    Utils.delete(logDir)
+  }
+
+  @Test
+  def testOverCompactedLogRecoveryMultiRecord(): Unit = {
+    // append some messages to create some segments
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, 0,
+      new SimpleRecord("v3".getBytes(), "k3".getBytes()),
+      new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.GZIP, 0,
+      new SimpleRecord("v5".getBytes(), "k5".getBytes()),
+      new SimpleRecord("v6".getBytes(), "k6".getBytes()))
+    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 6, CompressionType.GZIP, 0,
+      new SimpleRecord("v7".getBytes(), "k7".getBytes()),
+      new SimpleRecord("v8".getBytes(), "k8".getBytes()))
+    //Writes into an empty log with baseOffset 0
+    log.appendAsFollower(set1)
+    assertEquals(0L, log.activeSegment.baseOffset)
+    //This write will roll the segment, yielding a new segment with base offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2
+    log.appendAsFollower(set2)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists)
+    //This will go into the existing log
+    log.appendAsFollower(set3)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    //This will go into the existing log
+    log.appendAsFollower(set4)
+    assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
+    log.close()
+    val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index"))
+    assertEquals(2, indexFiles.length)
+    for (file <- indexFiles) {
+      val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong)
+      assertTrue(offsetIndex.lastOffset >= 0)
+      offsetIndex.close()
+    }
+    Utils.delete(logDir)
+  }
+
+  @Test
+  def testOverCompactedLogRecoveryMultiRecordV1(): Unit = {
+    // append some messages to create some segments
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    val set1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0, CompressionType.NONE,
+      new SimpleRecord("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP,
+      new SimpleRecord("v3".getBytes(), "k3".getBytes()),
+      new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+    val set3 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 4, CompressionType.GZIP,
+      new SimpleRecord("v5".getBytes(), "k5".getBytes()),
+      new SimpleRecord("v6".getBytes(), "k6".getBytes()))
+    val set4 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 6, CompressionType.GZIP,
+      new SimpleRecord("v7".getBytes(), "k7".getBytes()),
+      new SimpleRecord("v8".getBytes(), "k8".getBytes()))
+    //Writes into an empty log with baseOffset 0
+    log.appendAsFollower(set1)
+    assertEquals(0L, log.activeSegment.baseOffset)
+    //This write will roll the segment, yielding a new segment with base offset = max(1, 3) = 3
+    log.appendAsFollower(set2)
+    assertEquals(3, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, 3).exists)
+    //This will also roll the segment, yielding a new segment with base offset = max(5, Integer.MAX_VALUE+4) = Integer.MAX_VALUE+4
     log.appendAsFollower(set3)
-    assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 3).exists)
+    assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset)
+    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 4).exists)
     //This will go into the existing log
     log.appendAsFollower(set4)
-    assertEquals(Integer.MAX_VALUE.toLong + 3, log.activeSegment.baseOffset)
+    assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset)
     log.close()
     val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index"))
     assertEquals(3, indexFiles.length)
@@ -2534,7 +2613,7 @@ class LogTest {
       new SimpleRecord("baz".getBytes))
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // add more transactional records
     seq += 3
@@ -2542,14 +2621,14 @@ class LogTest {
       new SimpleRecord("blah".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // now transaction is committed
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
 
     // first unstable offset is not updated until the high watermark is advanced
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
     log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
@@ -2885,7 +2964,7 @@ class LogTest {
       new SimpleRecord("a".getBytes),
       new SimpleRecord("b".getBytes),
       new SimpleRecord("c".getBytes)), leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // mix in some non-transactional data
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
@@ -2900,7 +2979,7 @@ class LogTest {
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // now first producer's transaction is aborted
     val abortAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch),
@@ -2908,7 +2987,7 @@ class LogTest {
     log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1)
 
     // LSO should now point to one less than the first offset of the second transaction
-    assertEquals(Some(secondAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(secondAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
 
     // commit the second transaction
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch),
@@ -2934,7 +3013,7 @@ class LogTest {
     val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset))
 
     // this write should spill to the second segment
@@ -2943,7 +3022,7 @@ class LogTest {
       new SimpleRecord("d".getBytes),
       new SimpleRecord("e".getBytes),
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset))
     assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message