kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) (#5749)
Date Fri, 05 Oct 2018 22:01:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 68b1d49  KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) (#5749)
68b1d49 is described below

commit 68b1d49f4ea73aaab589e6054a1078db149db59b
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri Oct 5 15:01:30 2018 -0700

    KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678) (#5749)
    
    This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections.
    
    Additionally, we have made the following changes:
    
    1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache.
    2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |   3 +-
 core/src/main/scala/kafka/log/Log.scala            |  20 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |   6 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   6 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   6 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  | 196 +++++----
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  41 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |   4 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  14 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   8 +-
 .../LeaderEpochCheckpointFileTest.scala            |   1 -
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  24 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala    | 468 ++++++++-------------
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  52 ++-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   5 +-
 17 files changed, 397 insertions(+), 470 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e3a8186..ac0de9d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -280,8 +280,17 @@ class Partition(val topic: String,
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
-      val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)
 
+      // In the case of successive leader elections in a short time period, a follower may have
+      // entries in its log from a later epoch than any entry in the new leader's log. In order
+      // to ensure that these followers can truncate to the right offset, we must cache the new
+      // leader epoch and the start offset since it should be larger than any epoch that a follower
+      // would try to query.
+      leaderReplica.epochs.foreach { epochCache =>
+        epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+      }
+
+      val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 4b65e43..462f1f3 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,6 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
@@ -55,7 +56,7 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
 
-  val epochs = log.map(_.leaderEpochCache)
+  val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
 
   info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9b423ba..eeb569a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -39,7 +39,7 @@ import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.LeaderEpochFileCache
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import java.util.Map.{Entry => JEntry}
@@ -208,7 +208,7 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
-  @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
+  @volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache()
 
   locally {
     val startMs = time.milliseconds
@@ -218,12 +218,12 @@ class Log(@volatile var dir: File,
     /* Calculate the offset of the next message */
     nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
 
-    _leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
+    _leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)
 
     logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
-    _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+    _leaderEpochCache.truncateFromStart(logStartOffset)
 
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
 
@@ -271,11 +271,11 @@ class Log(@volatile var dir: File,
 
   def leaderEpochCache = _leaderEpochCache
 
-  private def initializeLeaderEpochCache(): LeaderEpochCache = {
+  private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
-    new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
-      new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
+    val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
+    new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
   }
 
   private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
@@ -352,7 +352,7 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
+  private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {
     val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
     logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
@@ -830,7 +830,7 @@ class Log(@volatile var dir: File,
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
-          _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+          _leaderEpochCache.truncateFromStart(logStartOffset)
           producerStateManager.truncateHead(logStartOffset)
           updateFirstUnstableOffset()
         }
@@ -1513,7 +1513,7 @@ class Log(@volatile var dir: File,
             updateLogEndOffset(targetOffset)
             this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
             this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-            _leaderEpochCache.clearAndFlushLatest(targetOffset)
+            _leaderEpochCache.truncateFromEnd(targetOffset)
             loadProducerState(targetOffset, reloadFromCleanShutdown = false)
           }
           true
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5970f42..0124946 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -22,7 +22,7 @@ import java.nio.file.attribute.FileTime
 import java.util.concurrent.TimeUnit
 
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -265,7 +265,7 @@ class LogSegment private[log] (val log: FileRecords,
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
-  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
     offsetIndex.reset()
     timeIndex.reset()
     txnIndex.reset()
@@ -293,7 +293,7 @@ class LogSegment private[log] (val log: FileRecords,
 
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
-            if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign()
+            if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to avoid unnecessary warning in cache.assign()
               cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
           }
           updateProducerState(producerStateManager, batch)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 74ef3e8..9b5c1bb 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -29,7 +29,7 @@ import ReplicaAlterLogDirsThread.FetchRequest
 import ReplicaAlterLogDirsThread.PartitionData
 import kafka.api.Request
 import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
@@ -58,7 +58,7 @@ class ReplicaAlterLogDirsThread(name: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
-  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
+  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochFileCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
 
   def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
     var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
@@ -148,7 +148,7 @@ class ReplicaAlterLogDirsThread(name: String,
 
     val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
 
-    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5e0e9be..4fceef8 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -23,7 +23,7 @@ import AbstractFetcherThread.ResultWithPartitions
 import kafka.api.{FetchRequest => _, _}
 import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.server.ReplicaFetcherThread._
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.TopicPartition
@@ -79,7 +79,7 @@ class ReplicaFetcherThread(name: String,
   private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
   private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
 
-  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
+  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochFileCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
 
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()
@@ -324,7 +324,7 @@ class ReplicaFetcherThread(name: String,
     val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
 
     debug(s"Build leaderEpoch request $partitionsWithEpoch")
-    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 220432d..10bdb17 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -18,53 +18,69 @@ package kafka.server.epoch
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.server.LogOffsetMetadata
 import kafka.server.checkpoints.LeaderEpochCheckpoint
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import scala.collection.mutable.ListBuffer
 
-trait LeaderEpochCache {
-  def assign(leaderEpoch: Int, offset: Long)
-  def latestEpoch(): Int
-  def endOffsetFor(epoch: Int): Long
-  def clearAndFlushLatest(offset: Long)
-  def clearAndFlushEarliest(offset: Long)
-  def clearAndFlush()
-  def clear()
-}
+import scala.collection.mutable.ListBuffer
 
 /**
-  * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
-  *
-  * Leader Epoch = epoch assigned to each leader by the controller.
-  * Offset = offset of the first message in each epoch.
-  *
-  * @param leo a function that determines the log end offset
-  * @param checkpoint the checkpoint file
-  */
-class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
+ *
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ *
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ * @param logEndOffset function to fetch the current log end offset
+ */
+class LeaderEpochFileCache(topicPartition: TopicPartition,
+                           logEndOffset: () => Long,
+                           checkpoint: LeaderEpochCheckpoint) extends Logging {
+  this.logIdent = s"[LeaderEpochCache $topicPartition] "
+
   private val lock = new ReentrantReadWriteLock()
   private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
 
   /**
     * Assigns the supplied Leader Epoch to the supplied Offset
     * Once the epoch is assigned it cannot be reassigned
-    *
-    * @param epoch
-    * @param offset
     */
-  override def assign(epoch: Int, offset: Long): Unit = {
+  def assign(epoch: Int, startOffset: Long): Unit = {
     inWriteLock(lock) {
-      if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) {
-        info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
-        epochs += EpochEntry(epoch, offset)
-        flush()
+      val updateNeeded = if (epochs.isEmpty) {
+        true
       } else {
-        validateAndMaybeWarn(epoch, offset)
+        val lastEntry = epochs.last
+        lastEntry.epoch != epoch || startOffset < lastEntry.startOffset
       }
+
+      if (updateNeeded) {
+        truncateAndAppend(EpochEntry(epoch, startOffset))
+        flush()
+      }
+    }
+  }
+
+  /**
+   * Remove any entries which violate monotonicity following the insertion of an assigned epoch.
+   */
+  private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
+    validateAndMaybeWarn(entryToAppend)
+
+    val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
+      entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
+    }
+
+    epochs = retainedEpochs :+ entryToAppend
+
+    if (removedEpochs.isEmpty) {
+      debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
+    } else {
+      warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
+        s"Cache now contains ${epochs.size} entries.")
     }
   }
 
@@ -74,7 +90,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * @return
     */
-  override def latestEpoch(): Int = {
+  def latestEpoch: Int = {
     inReadLock(lock) {
       if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
     }
@@ -90,42 +106,53 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     * if requestedEpoch is < the first epoch cached, UNSUPPORTED_EPOCH_OFFSET will be returned
     * so that the follower falls back to High Water Mark.
     *
-    * @param requestedEpoch
-    * @return offset
+    * @param requestedEpoch requested leader epoch
+    * @return found end offset
     */
-  override def endOffsetFor(requestedEpoch: Int): Long = {
+  def endOffsetFor(requestedEpoch: Int): Long = {
     inReadLock(lock) {
-      val offset =
+      val endOffset =
         if (requestedEpoch == UNDEFINED_EPOCH) {
-          // this may happen if a bootstrapping follower sends a request with undefined epoch or
+          // This may happen if a bootstrapping follower sends a request with undefined epoch or
           // a follower is on the older message format where leader epochs are not recorded
           UNDEFINED_EPOCH_OFFSET
         } else if (requestedEpoch == latestEpoch) {
-          leo().messageOffset
+          // For the leader, the latest epoch is always the current leader epoch that is still being written to.
+          // Followers should not have any reason to query for the end offset of the current epoch, but a consumer
+          // might if it is verifying its committed offset following a group rebalance. In this case, we return
+          // the current log end offset which makes the truncation check work as expected.
+          logEndOffset()
         } else {
-          val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
-          if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
+          val subsequentEpochs = epochs.filter { e => e.epoch > requestedEpoch}
+          if (subsequentEpochs.isEmpty) {
+            // The requested epoch is larger than any known epoch. This case should never be hit because
+            // the latest cached epoch is always the largest.
             UNDEFINED_EPOCH_OFFSET
-          else
+          } else {
+            // We have at least one previous epoch and one subsequent epoch. The result is the first
+            // prior epoch and the starting offset of the first subsequent epoch.
             subsequentEpochs.head.startOffset
+          }
         }
-      debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch list of size ${epochs.size}")
-      offset
+      debug(s"Processed end offset request for epoch $requestedEpoch and returning end offset $endOffset " +
+        s"from epoch cache of size ${epochs.size}")
+      endOffset
     }
   }
 
   /**
     * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
-    *
-    * @param offset
     */
-  override def clearAndFlushLatest(offset: Long): Unit = {
+  def truncateFromEnd(endOffset: Long): Unit = {
     inWriteLock(lock) {
-      val before = epochs
-      if (offset >= 0 && offset <= latestOffset()) {
-        epochs = epochs.filter(entry => entry.startOffset < offset)
+      if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) {
+        val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset)
+        epochs = previousEntries
+
         flush()
-        info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+
+        debug(s"Cleared entries $subsequentEntries from epoch cache after " +
+          s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.")
       }
     }
   }
@@ -136,20 +163,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
     *
-    * @param offset the offset to clear up to
+    * @param startOffset the offset to clear up to
     */
-  override def clearAndFlushEarliest(offset: Long): Unit = {
+  def truncateFromStart(startOffset: Long): Unit = {
     inWriteLock(lock) {
-      val before = epochs
-      if (offset >= 0 && earliestOffset() < offset) {
-        val earliest = epochs.filter(entry => entry.startOffset < offset)
-        if (earliest.nonEmpty) {
-          epochs = epochs --= earliest
-          //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset
-          if (offset < earliestOffset() || epochs.isEmpty)
-            new EpochEntry(earliest.last.epoch, offset) +=: epochs
+      if (epochs.nonEmpty) {
+        val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset)
+
+        previousEntries.lastOption.foreach { firstBeforeStartOffset =>
+          val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset)
+          epochs = updatedFirstEntry +: subsequentEntries
+
           flush()
-          info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+
+          debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " +
+            s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.")
         }
       }
     }
@@ -158,47 +186,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
   /**
     * Delete all entries.
     */
-  override def clearAndFlush() = {
+  def clearAndFlush() = {
     inWriteLock(lock) {
       epochs.clear()
       flush()
     }
   }
 
-  override def clear() = {
+  def clear() = {
     inWriteLock(lock) {
       epochs.clear()
     }
   }
 
-  def epochEntries(): ListBuffer[EpochEntry] = {
+  // Visible for testing
+  def epochEntries: ListBuffer[EpochEntry] = {
     epochs
   }
 
-  private def earliestOffset(): Long = {
-    if (epochs.isEmpty) -1 else epochs.head.startOffset
-  }
-
-  private def latestOffset(): Long = {
-    if (epochs.isEmpty) -1 else epochs.last.startOffset
-  }
+  private def latestEntry: Option[EpochEntry] = epochs.lastOption
 
   private def flush(): Unit = {
     checkpoint.write(epochs)
   }
 
-  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset:$latestOffset} for Partition: $topicPartition"
-
-  def validateAndMaybeWarn(epoch: Int, offset: Long) = {
-    assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
-    if (epoch < latestEpoch())
-      warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
-        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
-    else if (offset < latestOffset())
-      warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
-        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+  private def validateAndMaybeWarn(entry: EpochEntry) = {
+    if (entry.epoch < 0) {
+      throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
+    } else {
+      // If the latest append violates the monotonicity of epochs or starting offsets, our choices
+      // are either to raise an error, ignore the append, or allow the append and truncate the
+      // conflicting entries from the cache. Raising an error risks killing the fetcher threads in
+      // pathological cases (i.e. cases we are not yet aware of). We instead take the final approach
+      // and assume that the latest append is always accurate.
+
+      latestEntry.foreach { latest =>
+        if (entry.epoch < latest.epoch)
+          warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " +
+            s"of the latest entry $latest. This implies messages have arrived out of order.")
+        else if (entry.startOffset < latest.startOffset)
+          warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " +
+            s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.")
+      }
+    }
   }
 }
 
 // Mapping of epoch to the first offset of the subsequent epoch
-case class EpochEntry(epoch: Int, startOffset: Long)
+case class EpochEntry(epoch: Int, startOffset: Long) {
+  override def toString: String = {
+    s"EpochEntry(epoch=$epoch, startOffset=$startOffset)"
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index cf679f6..e0dcdfd 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -29,7 +29,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
-import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
@@ -268,7 +268,7 @@ class LogTest {
             }
 
             override def recover(producerStateManager: ProducerStateManager,
-                                 leaderEpochCache: Option[LeaderEpochCache]): Int = {
+                                 leaderEpochCache: Option[LeaderEpochFileCache]): Int = {
               recoveredSegments += this
               super.recover(producerStateManager, leaderEpochCache)
             }
@@ -2246,8 +2246,8 @@ class LogTest {
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
-    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries().head)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
+    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -2256,7 +2256,7 @@ class LogTest {
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
-    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
   }
 
   @Test
@@ -2269,12 +2269,12 @@ class LogTest {
     log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
 
     log.close()
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
   }
 
   @Test
@@ -2447,7 +2447,7 @@ class LogTest {
     for (i <- records.indices)
       log.appendAsFollower(recordsForEpoch(i))
 
-    assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch())
+    assertEquals(42, log.leaderEpochCache.latestEpoch)
   }
 
   @Test
@@ -2502,19 +2502,24 @@ class LogTest {
 
   @Test
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
-    val logConfig = createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
+    def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
+      TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
+        baseOffset = startOffset, partitionLeaderEpoch = epoch)
+    }
+
+    val logConfig = createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes)
     val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
-    //Given 2 segments, 10 messages per segment
-    for (epoch <- 1 to 20)
-      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    def append(epoch: Int, startOffset: Long, count: Int): Unit = {
+      for (i <- 0 until count)
+        log.appendAsFollower(createRecords(startOffset + i, epoch))
+    }
 
-    //Simulate some leader changes at specific offsets
-    cache.assign(0, 0)
-    cache.assign(1, 10)
-    cache.assign(2, 16)
+    //Given 2 segments, 10 messages per segment
+    append(epoch = 0, startOffset = 0, count = 10)
+    append(epoch = 1, startOffset = 10, count = 6)
+    append(epoch = 2, startOffset = 16, count = 4)
 
     assertEquals(2, log.numberOfSegments)
     assertEquals(20, log.logEndOffset)
@@ -2566,7 +2571,7 @@ class LogTest {
     assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
 
     // deliberately remove some of the epoch entries
-    leaderEpochCache.clearAndFlushLatest(2)
+    leaderEpochCache.truncateFromEnd(2)
     assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
     log.close()
 
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 8212ed6..cb914c4 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -218,7 +218,7 @@ class IsrExpirationTest {
 
   private def logMock: Log = {
     val log = EasyMock.createMock(classOf[kafka.log.Log])
-    val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+    val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 2074044..1f4d04b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -19,8 +19,8 @@ package kafka.server
 import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogManager
 import kafka.cluster.Partition
-import kafka.server.epoch.LeaderEpochCache
 import org.apache.kafka.common.requests.EpochEndOffset._
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
@@ -109,7 +109,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -167,7 +167,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -213,7 +213,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -258,7 +258,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[kafka.log.LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -314,7 +314,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -362,7 +362,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 3be33a2..01ba4b0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -624,8 +624,8 @@ class ReplicaManagerTest {
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
-    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
-    EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
+    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
+    EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn(localLogOffset)
     EasyMock.replay(mockLeaderEpochCache)
@@ -644,7 +644,7 @@ class ReplicaManagerTest {
         new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 30000),
       logDirFailureChannel = mockLogDirFailureChannel) {
 
-      override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache
+      override def leaderEpochCache: LeaderEpochFileCache = mockLeaderEpochCache
 
       override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
     }
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
index e7c6a97..0c47f15 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
@@ -24,7 +24,6 @@ import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 
-
 class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 6288d8f..bd87bc2 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -89,23 +89,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     assertEquals(0, latestRecord(follower).partitionLeaderEpoch())
 
     //Both leader and follower should have recorded Epoch 0 at Offset 0
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
 
     //Bounce the follower
     bounce(follower)
     awaitISR(tp)
 
     //Nothing happens yet as we haven't sent any new messages.
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
 
     //Send a message
     producer.send(new ProducerRecord(topic, 0, null, msg)).get
 
     //Epoch1 should now propagate to the follower with the written message
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
 
     //The new message should have epoch 1 stamped
     assertEquals(1, latestRecord(leader).partitionLeaderEpoch())
@@ -116,8 +116,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     awaitISR(tp)
 
     //Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication.
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
 
     //Send a message
     producer.send(new ProducerRecord(topic, 0, null, msg)).get
@@ -127,8 +127,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     assertEquals(2, latestRecord(follower).partitionLeaderEpoch())
 
     //The leader epoch files should now match on leader and follower
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries)
   }
 
   @Test
@@ -300,8 +300,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
     info(s"Bounce complete for follower ${follower.config.brokerId}")
-    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
-    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries())
+    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries)
+    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries)
   }
 
   private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 4a8df11..6cd08c7 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -16,6 +16,7 @@
   */
 
 package kafka.server.epoch
+
 import java.io.File
 
 import kafka.server.LogOffsetMetadata
@@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.Test
 
 import scala.collection.mutable.ListBuffer
 
@@ -33,51 +34,42 @@ import scala.collection.mutable.ListBuffer
   */
 class LeaderEpochFileCacheTest {
   val tp = new TopicPartition("TestTopic", 5)
-  var checkpoint: LeaderEpochCheckpoint = _
+  private var logEndOffset = 0L
+  private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+    private var epochs: Seq[EpochEntry] = Seq()
+    override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
+    override def read(): Seq[EpochEntry] = this.epochs
+  }
+  private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
 
   @Test
   def shouldAddEpochAndMessageOffsetToCache() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 10)
-    leo = 11
+    cache.assign(epoch = 2, startOffset = 10)
+    logEndOffset = 11
 
     //Then
-    assertEquals(2, cache.latestEpoch())
-    assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
-    assertEquals(11, cache.endOffsetFor(2)) //should match leo
+    assertEquals(2, cache.latestEpoch)
+    assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
+    assertEquals(logEndOffset, cache.endOffsetFor(2)) //should match logEndOffset
   }
 
   @Test
   def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When just one epoch
-    cache.assign(epoch = 2, offset = 11)
-    cache.assign(epoch = 2, offset = 12)
-    leo = 14
+    cache.assign(epoch = 2, startOffset = 11)
+    cache.assign(epoch = 2, startOffset = 12)
+    logEndOffset = 14
 
     //Then
-    assertEquals(14, cache.endOffsetFor(2))
+    assertEquals(logEndOffset, cache.endOffsetFor(2))
   }
 
   @Test
   def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given cache with some data on leader
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 11)
-    cache.assign(epoch = 3, offset = 12)
+    // assign couple of epochs
+    cache.assign(epoch = 2, startOffset = 11)
+    cache.assign(epoch = 3, startOffset = 12)
 
     //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
     val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@@ -88,68 +80,51 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
     //Given
-    leo = 9
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 9
 
-    cache.assign(2, leo)
+    cache.assign(2, logEndOffset)
 
     //When called again later
     cache.assign(2, 10)
 
     //Then the offset should NOT have been updated
-    assertEquals(leo, cache.epochEntries()(0).startOffset)
+    assertEquals(logEndOffset, cache.epochEntries(0).startOffset)
+    assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries)
   }
 
   @Test
-  def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
+  def shouldEnforceMonotonicallyIncreasingStartOffsets() = {
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
     cache.assign(2, 9)
 
     //When update epoch new epoch but same offset
     cache.assign(3, 9)
 
     //Then epoch should have been updated
-    assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries())
+    assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries)
   }
   
   @Test
   def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = {
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint)
     cache.assign(2, 6)
 
     //When called again later with a greater offset
     cache.assign(2, 10)
 
     //Then later update should have been ignored
-    assertEquals(6, cache.epochEntries()(0).startOffset)
+    assertEquals(6, cache.epochEntries(0).startOffset)
   }
 
   @Test
   def shouldReturnUnsupportedIfNoEpochRecorded(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0))
   }
 
   @Test
   def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
-    val leo = 73
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 73
 
     //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
     val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@@ -159,39 +134,41 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
-  def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
-    cache.assign(epoch = 5, offset = 11)
-    cache.assign(epoch = 6, offset = 12)
-    cache.assign(epoch = 7, offset = 13)
+  def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(){
+    cache.assign(epoch = 5, startOffset = 11)
+    cache.assign(epoch = 6, startOffset = 12)
+    cache.assign(epoch = 7, startOffset = 13)
 
     //When
-    val offset = cache.endOffsetFor(5 - 1)
+    val endOffset = cache.endOffsetFor(4)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH_OFFSET, offset)
+    assertEquals(11, endOffset)
   }
 
   @Test
-  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
+  def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = {
+    cache.assign(epoch = 5, startOffset = 11)
+    cache.assign(epoch = 6, startOffset = 12)
+    cache.assign(epoch = 7, startOffset = 13)
 
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    // epoch 7 starts at an earlier offset
+    cache.assign(epoch = 7, startOffset = 12)
 
+    assertEquals(12, cache.endOffsetFor(5))
+    assertEquals(12, cache.endOffsetFor(6))
+  }
+
+  @Test
+  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
     //When several epochs
-    cache.assign(epoch = 1, offset = 11)
-    cache.assign(epoch = 1, offset = 12)
-    cache.assign(epoch = 2, offset = 13)
-    cache.assign(epoch = 2, offset = 14)
-    cache.assign(epoch = 3, offset = 15)
-    cache.assign(epoch = 3, offset = 16)
-    leo = 17
+    cache.assign(epoch = 1, startOffset = 11)
+    cache.assign(epoch = 1, startOffset = 12)
+    cache.assign(epoch = 2, startOffset = 13)
+    cache.assign(epoch = 2, startOffset = 14)
+    cache.assign(epoch = 3, startOffset = 15)
+    cache.assign(epoch = 3, startOffset = 16)
+    logEndOffset = 17
 
     //Then get the start offset of the next epoch
     assertEquals(15, cache.endOffsetFor(2))
@@ -199,15 +176,10 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 0, offset = 10)
-    cache.assign(epoch = 2, offset = 13)
-    cache.assign(epoch = 4, offset = 17)
+    cache.assign(epoch = 0, startOffset = 10)
+    cache.assign(epoch = 2, startOffset = 13)
+    cache.assign(epoch = 4, startOffset = 17)
 
     //Then
     assertEquals(13, cache.endOffsetFor(requestedEpoch = 1))
@@ -216,14 +188,9 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 2, offset = 7)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 2, startOffset = 7)
 
     //Then
     assertEquals(1, cache.epochEntries.size)
@@ -232,14 +199,10 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
-    val leo = 100
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 100
 
     //When
-    cache.assign(epoch = 2, offset = 100)
+    cache.assign(epoch = 2, startOffset = 100)
 
     //Then
     assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3))
@@ -247,35 +210,28 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 6)
-    leo = 7
+    cache.assign(epoch = 2, startOffset = 6)
+    logEndOffset = 7
 
     //Then
-    assertEquals(leo, cache.endOffsetFor(2))
+    assertEquals(logEndOffset, cache.endOffsetFor(2))
     assertEquals(1, cache.epochEntries.size)
-    assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
+    assertEquals(EpochEntry(2, 6), cache.epochEntries(0))
   }
 
   @Test
   def shouldPersistEpochsBetweenInstances(){
-    def leoFinder() = new LogOffsetMetadata(0)
     val checkpointPath = TestUtils.tempFile().getAbsolutePath
-    checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
+    val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
 
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
+    val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+    cache.assign(epoch = 2, startOffset = 6)
 
     //When
     val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
-    val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2)
+    val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2)
 
     //Then
     assertEquals(1, cache2.epochEntries.size)
@@ -283,81 +239,68 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
-  def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
+  def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
     //Given
-    cache.assign(epoch = 1, offset = 5); leo = 6
-    cache.assign(epoch = 2, offset = 6); leo = 7
-
-    //When we update an epoch in the past with an earlier offset
-    cache.assign(epoch = 1, offset = 7); leo = 8
+    cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
+    cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
 
-    //Then epoch should not be changed
-    assertEquals(2, cache.latestEpoch())
+    //When we update an epoch in the past with a different offset, the log has already reached
+    //an inconsistent state. Our options are either to raise an error, ignore the new append,
+    //or truncate the cached epochs to the point of conflict. We take this latter approach in
+    //order to guarantee that epochs and offsets in the cache increase monotonically, which makes
+    //the search logic simpler to reason about.
+    cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8
 
-    //Then end offset for epoch 1 shouldn't have changed
-    assertEquals(6, cache.endOffsetFor(1))
+    //Then later epochs will be removed
+    assertEquals(1, cache.latestEpoch)
 
-    //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't thing of a better option)
-    assertEquals(8, cache.endOffsetFor(2))
+    //Then end offset for epoch 1 will have changed
+    assertEquals(8, cache.endOffsetFor(1))
 
-    //Epoch history shouldn't have changed
-    assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
-    assertEquals(EpochEntry(2, 6), cache.epochEntries()(1))
+    //Then end offset for epoch 2 is now undefined
+    assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(2))
+    assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
   }
 
   @Test
-  def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
+  def shouldEnforceOffsetsIncreaseMonotonically() = {
     //When epoch goes forward but offset goes backwards
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 5)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 5)
 
-    //Then latter assign should be ignored
-    assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
+    //The last assignment wins and the conflicting one is removed from the log
+    assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0))
   }
 
   @Test
   def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 0, offset = 0) //leo=0
+    cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0
 
     //When
-    cache.assign(epoch = 1, offset = 0) //leo=0
+    cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0
 
     //Then epoch should go up
-    assertEquals(1, cache.latestEpoch())
+    assertEquals(1, cache.latestEpoch)
     //offset for 1 should still be 0
     assertEquals(0, cache.endOffsetFor(1))
     //offset for epoch 0 should still be 0
     assertEquals(0, cache.endOffsetFor(0))
 
     //When we write 5 messages as epoch 1
-    leo = 5
+    logEndOffset = 5
 
-    //Then end offset for epoch(1) should be leo => 5
+    //Then end offset for epoch(1) should be logEndOffset => 5
     assertEquals(5, cache.endOffsetFor(1))
     //Epoch 0 should still be at offset 0
     assertEquals(0, cache.endOffsetFor(0))
 
     //When
-    cache.assign(epoch = 2, offset = 5) //leo=5
+    cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5
 
-    leo = 10 //write another 5 messages
+    logEndOffset = 10 //write another 5 messages
 
-    //Then end offset for epoch(2) should be leo => 10
+    //Then end offset for epoch(2) should be logEndOffset => 10
     assertEquals(10, cache.endOffsetFor(2))
 
     //end offset for epoch(1) should be the start offset of epoch(2) => 5
@@ -369,36 +312,30 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //When new
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When Messages come in
-    cache.assign(epoch = 0, offset = 0); leo = 1
-    cache.assign(epoch = 0, offset = 1); leo = 2
-    cache.assign(epoch = 0, offset = 2); leo = 3
+    cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1
+    cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2
+    cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3
 
     //Then epoch should stay, offsets should grow
-    assertEquals(0, cache.latestEpoch())
-    assertEquals(leo, cache.endOffsetFor(0))
+    assertEquals(0, cache.latestEpoch)
+    assertEquals(logEndOffset, cache.endOffsetFor(0))
 
     //When messages arrive with greater epoch
-    cache.assign(epoch = 1, offset = 3); leo = 4
-    cache.assign(epoch = 1, offset = 4); leo = 5
-    cache.assign(epoch = 1, offset = 5); leo = 6
+    cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4
+    cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5
+    cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
 
-    assertEquals(1, cache.latestEpoch())
-    assertEquals(leo, cache.endOffsetFor(1))
+    assertEquals(1, cache.latestEpoch)
+    assertEquals(logEndOffset, cache.endOffsetFor(1))
 
     //When
-    cache.assign(epoch = 2, offset = 6); leo = 7
-    cache.assign(epoch = 2, offset = 7); leo = 8
-    cache.assign(epoch = 2, offset = 8); leo = 9
+    cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
+    cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8
+    cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9
 
-    assertEquals(2, cache.latestEpoch())
-    assertEquals(leo, cache.endOffsetFor(2))
+    assertEquals(2, cache.latestEpoch)
+    assertEquals(logEndOffset, cache.endOffsetFor(2))
 
     //Older epochs should return the start offset of the first message in the subsequent epoch.
     assertEquals(3, cache.endOffsetFor(0))
@@ -407,16 +344,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When clear latest on epoch boundary
-    cache.clearAndFlushLatest(offset = 8)
+    cache.truncateFromEnd(endOffset = 8)
 
     //Then should remove two latest epochs (remove is inclusive)
     assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
@@ -424,16 +358,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset ON epoch boundary
-    cache.clearAndFlushEarliest(offset = 8)
+    cache.truncateFromStart(startOffset = 8)
 
     //Then should preserve (3, 8)
     assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -441,16 +372,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearAndFlushEarliest(offset = 9)
+    cache.truncateFromStart(startOffset = 9)
 
     //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -458,16 +386,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotClearAnythingIfOffsetToEarly(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset before first epoch offset
-    cache.clearAndFlushEarliest(offset = 1)
+    cache.truncateFromStart(startOffset = 1)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -475,16 +400,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on earliest epoch boundary
-    cache.clearAndFlushEarliest(offset = 6)
+    cache.truncateFromStart(startOffset = 6)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -492,16 +414,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldRetainLatestEpochOnClearAllEarliest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When
-    cache.clearAndFlushEarliest(offset = 11)
+    cache.truncateFromStart(startOffset = 11)
 
     //Then retain the last
     assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
@@ -509,16 +428,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When we clear from a postition between offset 8 & offset 11
-    cache.clearAndFlushEarliest(offset = 9)
+    cache.truncateFromStart(startOffset = 9)
 
     //Then we should update the middle epoch entry's offset
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -526,16 +442,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 0, offset = 0)
-    cache.assign(epoch = 1, offset = 7)
-    cache.assign(epoch = 2, offset = 10)
+    cache.assign(epoch = 0, startOffset = 0)
+    cache.assign(epoch = 1, startOffset = 7)
+    cache.assign(epoch = 2, startOffset = 10)
 
     //When we clear from a postition between offset 0 & offset 7
-    cache.clearAndFlushEarliest(offset = 5)
+    cache.truncateFromStart(startOffset = 5)
 
     //Then we should keeep epoch 0 but update the offset appropriately
     assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
@@ -543,16 +456,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset beyond last epoch
-    cache.clearAndFlushEarliest(offset = 15)
+    cache.truncateFromStart(startOffset = 15)
 
     //Then update the last
     assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
@@ -560,51 +470,42 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearAndFlushLatest(offset = 9)
+    cache.truncateFromEnd(endOffset = 9)
 
     //Then should keep the preceding epochs
-    assertEquals(3, cache.latestEpoch())
+    assertEquals(3, cache.latestEpoch)
     assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
   }
 
   @Test
   def shouldClearAllEntries(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
-    //When 
+    //When
     cache.clearAndFlush()
 
-    //Then 
+    //Then
     assertEquals(0, cache.epochEntries.size)
   }
 
   @Test
   def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -612,16 +513,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -629,54 +527,26 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldFetchLatestEpochOfEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals(-1, cache.latestEpoch)
   }
 
   @Test
   def shouldFetchEndOffsetOfEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals(-1, cache.endOffsetFor(7))
   }
 
   @Test
   def shouldClearEarliestOnEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
-    cache.clearAndFlushEarliest(7)
+    cache.truncateFromStart(7)
   }
 
   @Test
   def shouldClearLatestOnEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
-    cache.clearAndFlushLatest(7)
+    cache.truncateFromEnd(7)
   }
 
-  @Before
-  def setUp() {
-    checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index dc6ff9e..0d479ff 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
+import scala.collection.mutable.ListBuffer
 
 class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
-  var brokers: Seq[KafkaServer] = null
+  var brokers: ListBuffer[KafkaServer] = ListBuffer()
   val topic1 = "foo"
   val topic2 = "bar"
   val t1p0 = new TopicPartition(topic1, 0)
@@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
-    brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     // Given two topics with replication of a single partition
     for (topic <- List(topic1, topic2)) {
@@ -94,14 +95,13 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = {
 
     //3 brokers, put partition on 100/101 and then pretend to be 102
-    brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic1, Map(
-      0 -> Seq(100),
-      1 -> Seq(101)
-    ))
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic2, Map(
-      0 -> Seq(100)
-    ))
+    brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+
+    val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
+    TestUtils.createTopic(zkClient, topic1, assignment1, brokers)
+
+    val assignment2 = Map(0 -> Seq(100))
+    TestUtils.createTopic(zkClient, topic2, assignment2, brokers)
 
     //Send messages equally to the two partitions, then half as many to a third
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1)
@@ -139,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = {
-
     //Setup: we are only interested in the single partition on broker 101
-    brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers += createServer(fromProps(createBrokerConfig(100, zkConnect)))
+    assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
+
+    brokers += createServer(fromProps(createBrokerConfig(101, zkConnect)))
+
     def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
     adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, Map(tp.partition -> Seq(101)))
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1)
@@ -151,10 +154,9 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
     //Then epoch should be 0 and leo: 1
-    var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
-    assertEquals(1, offset)
-    assertEquals(leo(), offset)
-
+    var epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
+    assertEquals(1, epochEndOffset.endOffset)
+    assertEquals(1, leo())
 
     //2. When broker is bounced
     brokers(1).shutdown()
@@ -163,15 +165,20 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
     fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
-
     //Then epoch 0 should still be the start offset of epoch 1
-    offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
-    assertEquals(1, offset)
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
+    assertEquals(1, epochEndOffset.endOffset)
 
-    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica)
-    assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
-    assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
+    //No data written in epoch 1
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 1))(tp)
+    assertEquals(1, epochEndOffset.endOffset)
 
+    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 -
+    //This is because we have to first change leader to -1 and then change it again to the live replica)
+    //Note that the expected leader changes depend on the controller being on broker 100, which is not restarted
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 2))(tp)
+    assertEquals(2, epochEndOffset.endOffset)
+    assertEquals(2, leo())
 
     //3. When broker is bounced again
     brokers(1).shutdown()
@@ -180,7 +187,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
     fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
-
     //Then Epoch 0 should still map to offset 1
     assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset())
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 1c01d62..da1ebbe 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest {
 
     //Stubs
     val mockLog = createNiceMock(classOf[kafka.log.Log])
-    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
+    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[kafka.log.LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7eb5caf..b425df8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -377,10 +377,11 @@ object TestUtils extends Logging {
               producerId: Long = RecordBatch.NO_PRODUCER_ID,
               producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
               sequence: Int = RecordBatch.NO_SEQUENCE,
-              baseOffset: Long = 0L): MemoryRecords = {
+              baseOffset: Long = 0L,
+              partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset,
-      System.currentTimeMillis, producerId, producerEpoch, sequence)
+      System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch)
     records.foreach(builder.append)
     builder.build()
   }


Mime
View raw message