kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7786; Ignore OffsetsForLeaderEpoch response if epoch changed while request in flight (#6101)
Date Wed, 09 Jan 2019 19:16:10 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new ec1c3c8  KAFKA-7786; Ignore OffsetsForLeaderEpoch response if epoch changed while
request in flight (#6101)
ec1c3c8 is described below

commit ec1c3c8a272b0fe75041bc901b7f27365a382923
Author: Anna Povzner <anna@confluent.io>
AuthorDate: Wed Jan 9 11:09:48 2019 -0800

    KAFKA-7786; Ignore OffsetsForLeaderEpoch response if epoch changed while request in flight
(#6101)
    
    There is a race condition in ReplicaFetcherThread, where we can update PartitionFetchState
with the new leader epoch (same leader) before handling the OffsetsForLeaderEpoch response
with FENCED_LEADER_EPOCH error which causes removing partition from partitionStates, which
in turn causes no fetching until the next LeaderAndIsr.
    
    This patch adds logic to ensure that the leader epoch doesn't change while an OffsetsForLeaderEpoch
request is in flight (which could happen with back-to-back leader elections). If it has changed,
we ignore the response.
    
    Also added toString() implementation to PartitionData, because some log messages did not
show useful info which I found while investigating the above system test failure.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../requests/OffsetsForLeaderEpochRequest.java     |  8 ++
 .../requests/OffsetsForLeaderEpochResponse.java    | 10 +++
 .../scala/kafka/server/AbstractFetcherThread.scala | 13 +++-
 .../kafka/server/AbstractFetcherThreadTest.scala   | 85 ++++++++++++++++++++++
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 13 +++-
 6 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index feeb875..57949a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -180,5 +180,13 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
             this.leaderEpoch = leaderEpoch;
         }
 
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
+                append(", leaderEpoch=").append(leaderEpoch).
+                append(")");
+            return bld.toString();
+        }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 6f70850..d5d1265 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -166,4 +166,14 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
         responseStruct.set(TOPICS, topics.toArray());
         return responseStruct;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("(type=OffsetsForLeaderEpochResponse, ")
+            .append(", throttleTimeMs=").append(throttleTimeMs)
+            .append(", epochEndOffsetsByPartition=").append(epochEndOffsetsByPartition)
+            .append(")");
+        return bld.toString();
+    }
 }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 797b0f5..a78c2a1 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -179,8 +179,17 @@ abstract class AbstractFetcherThread(name: String,
       val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
       //Ensure we hold a lock during truncation.
       inLock(partitionMapLock) {
-        //Check no leadership changes happened whilst we were unlocked, fetching epochs
-        val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp)
}
+        //Check no leadership and no leader epoch changes happened whilst we were unlocked,
fetching epochs
+        val leaderEpochs = fetchedEpochs.filter { case (tp, _) =>
+          val curPartitionState = partitionStates.stateValue(tp)
+          val partitionEpochRequest = epochRequests.get(tp).getOrElse {
+            throw new IllegalStateException(
+              s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch
request")
+          }
+          val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
+          curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
+        }
+
         val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs)
         handlePartitionsWithErrors(partitionsWithError)
         updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 77ba934..e9e4e33 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -39,6 +39,7 @@ import org.junit.{Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Set, mutable}
 import scala.util.Random
+import org.scalatest.Assertions.assertThrows
 
 class AbstractFetcherThreadTest {
 
@@ -499,6 +500,90 @@ class AbstractFetcherThreadTest {
     assertEquals(2L, replicaState.logEndOffset)
   }
 
+  @Test
+  def testLeaderEpochChangeDuringFencedFetchEpochsFromLeader(): Unit = {
+    // The leader is on the new epoch when the OffsetsForLeaderEpoch with old epoch is sent,
so it
+    // returns the fence error. Validate that response is ignored if the leader epoch changes
on
+    // the follower while OffsetsForLeaderEpoch request is in flight, but able to truncate
and fetch
+    // in the next of round of "doWork"
+    testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 1)
+  }
+
+  @Test
+  def testLeaderEpochChangeDuringSuccessfulFetchEpochsFromLeader(): Unit = {
+    // The leader is on the old epoch when the OffsetsForLeaderEpoch with old epoch is sent
+    // and returns the valid response. Validate that response is ignored if the leader epoch
changes
+    // on the follower while OffsetsForLeaderEpoch request is in flight, but able to truncate
and
+    // fetch once the leader is on the newer epoch (same as follower)
+    testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 0)
+  }
+
+  private def testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int):
Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val initialLeaderEpochOnFollower = 0
+    val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
+
+    val fetcher = new MockFetcherThread {
+      var fetchEpochsFromLeaderOnce = false
+      override def fetchEpochsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition,
EpochEndOffset] = {
+        val fetchedEpochs = super.fetchEpochsFromLeader(partitions)
+        if (!fetchEpochsFromLeaderOnce) {
+          // leader epoch changes while fetching epochs from leader
+          removePartitions(Set(partition))
+          setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = nextLeaderEpochOnFollower))
+          addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = nextLeaderEpochOnFollower)))
+          fetchEpochsFromLeaderOnce = true
+        }
+        fetchedEpochs
+      }
+    }
+
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = initialLeaderEpochOnFollower))
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = initialLeaderEpochOnFollower)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = initialLeaderEpochOnFollower, new SimpleRecord("c".getBytes)))
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpochOnLeader, highWatermark
= 0L)
+    fetcher.setLeaderState(partition, leaderState)
+
+    // first round of truncation
+    fetcher.doWork()
+
+    // Since leader epoch changed, fetch epochs response is ignored due to partition being
in
+    // truncating state with the updated leader epoch
+    assertEquals(Option(Truncating), fetcher.fetchState(partition).map(_.state))
+    assertEquals(Option(nextLeaderEpochOnFollower), fetcher.fetchState(partition).map(_.currentLeaderEpoch))
+
+    if (leaderEpochOnLeader < nextLeaderEpochOnFollower) {
+      fetcher.setLeaderState(
+        partition, MockFetcherThread.PartitionState(leaderLog, nextLeaderEpochOnFollower,
highWatermark = 0L))
+    }
+
+    // make sure the fetcher is now able to truncate and fetch
+    fetcher.doWork()
+    assertEquals(fetcher.leaderPartitionState(partition).log, fetcher.replicaPartitionState(partition).log)
+  }
+
+  @Test
+  def testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs():
Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread {
+      override def fetchEpochsFromLeader(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition,
EpochEndOffset] = {
+        val unrequestedTp = new TopicPartition("topic2", 0)
+        super.fetchEpochsFromLeader(partitions) + (unrequestedTp -> new EpochEndOffset(0,
0))
+      }
+    }
+
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = 0)))
+    fetcher.setLeaderState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0))
+
+    // first round of truncation should throw an exception
+    assertThrows[IllegalStateException] {
+      fetcher.doWork()
+    }
+  }
+
   object MockFetcherThread {
     class PartitionState(var log: mutable.Buffer[RecordBatch],
                          var leaderEpoch: Int,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index c65c254..ef39aa5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -669,7 +669,7 @@ class ReplicaFetcherThreadTest {
     //Create the thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new
SystemTime())
     val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager,
new Metrics(), new SystemTime(), quota, Some(mockNetwork))
-    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t2p1 -> offsetAndEpoch(0L)))
+    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t1p1 -> offsetAndEpoch(0L)))
 
     //Run thread 3 times
     (0 to 3).foreach { _ =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c4ca2cb..5fd7697 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -547,12 +547,13 @@ class ReplicaManagerTest {
     val controllerId = 0
     val controllerEpoch = 0
     var leaderEpoch = 1
+    val leaderEpochIncrement = 2
     val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
     val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
-      topicPartition, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation
= true)
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId,
countDownLatch, expectTruncation = true)
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, topicPartition))
@@ -563,7 +564,7 @@ class ReplicaManagerTest {
 
     // Make local partition a follower - because epoch increased by more than 1, truncation
should
     // trigger even though leader does not change
-    leaderEpoch += 2
+    leaderEpoch += leaderEpochIncrement
     val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
       controllerId, controllerEpoch,
       collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
@@ -578,7 +579,13 @@ class ReplicaManagerTest {
     EasyMock.verify(mockLogMgr)
   }
 
+  /**
+   * This method assumes that the test using created ReplicaManager calls
+   * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
+   * 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'.
+   */
   private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+                                                 leaderEpochInLeaderAndIsr: Int,
                                                  followerBrokerId: Int,
                                                  leaderBrokerId: Int,
                                                  countDownLatch: CountDownLatch,
@@ -671,7 +678,7 @@ class ReplicaManagerTest {
               override def doWork() = {
                 // In case the thread starts before the partition is added by AbstractFetcherManager,
                 // add it here (it's a no-op if already added)
-                val initialOffset = OffsetAndEpoch(offset = 0L, leaderEpoch = 1)
+                val initialOffset = OffsetAndEpoch(offset = 0L, leaderEpoch = leaderEpochInLeaderAndIsr)
                 addPartitions(Map(new TopicPartition(topic, topicPartition) -> initialOffset))
                 super.doWork()
 


Mime
View raw message