kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Use FetchRequest v8 and ListOffsetRequest v3 in ReplicaFetcherThread (#5342)
Date Fri, 06 Jul 2018 22:03:22 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 38e4be6  MINOR: Use FetchRequest v8 and ListOffsetRequest v3 in ReplicaFetcherThread
(#5342)
38e4be6 is described below

commit 38e4be686ccd8906f936c0c010d628956c2844a7
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Fri Jul 6 14:35:25 2018 -0700

    MINOR: Use FetchRequest v8 and ListOffsetRequest v3 in ReplicaFetcherThread (#5342)
    
    If inter.broker.protocol.version is 2.0-IV1 or newer. Also fixed ListOffsetRequest
    so that v2 is used, if applicable.
    
    Added a unit test which verifies that we use the latest version of the various
    requests by default. Included a few minor tweaks to make testing easier.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 core/src/main/scala/kafka/api/ApiVersion.scala     |  6 ++---
 .../src/main/scala/kafka/server/QuotaFactory.scala |  3 ++-
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 26 +++++++++++++++-----
 .../kafka/server/ReplicationQuotaManager.scala     |  5 ++--
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 28 ++++++++++++++++++----
 .../kafka/server/ReplicaManagerQuotasTest.scala    | 16 ++++++-------
 .../kafka/server/ReplicationQuotaManagerTest.scala | 16 ++++++-------
 7 files changed, 68 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 9ed6432..5528b52 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -70,9 +70,9 @@ object ApiVersion {
     // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests,
     // and KafkaStorageException for fetch requests.
     KAFKA_1_1_IV0,
-    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between
leader and follower after fast leader fail over)
     KAFKA_2_0_IV0,
-    // Introduced ApiVersionsRequest V2 via KIP-219
+    // Several request versions were bumped due to KIP-219 (Improve quota communication)
     KAFKA_2_0_IV1
   )
 
@@ -248,4 +248,4 @@ case object KAFKA_2_0_IV1 extends DefaultApiVersion {
   val subVersion = "IV1"
   val recordVersion = RecordVersion.V2
   val id: Int = 16
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 1ee713b..ed04dcf 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -37,7 +37,8 @@ object QuotaFactory extends Logging {
 
   object UnboundedQuota extends ReplicaQuota {
     override def isThrottled(topicPartition: TopicPartition): Boolean = false
-    override def isQuotaExceeded(): Boolean = false
+    override def isQuotaExceeded: Boolean = false
+    def record(value: Long): Unit = ()
   }
 
   case class QuotaManagers(fetch: ClientQuotaManager,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ce6e350..27defd3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -47,7 +47,7 @@ class ReplicaFetcherThread(name: String,
                            replicaMgr: ReplicaManager,
                            metrics: Metrics,
                            time: Time,
-                           quota: ReplicationQuotaManager,
+                           quota: ReplicaQuota,
                            leaderEndpointBlockingSend: Option[BlockingSend] = None)
   extends AbstractFetcherThread(name = name,
                                 clientId = name,
@@ -63,20 +63,34 @@ class ReplicaFetcherThread(name: String,
   private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id},
" +
     s"fetcherId=$fetcherId] ")
   this.logIdent = logContext.logPrefix
+
   private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(
     new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,
       s"broker-$replicaId-fetcher-$fetcherId", logContext))
-  private val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
+
+  // Visible for testing
+  private[server] val fetchRequestVersion: Short =
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
-  private val offsetForLeaderEpochRequestVersion: Short =
+
+  // Visible for testing
+  private[server] val offsetForLeaderEpochRequestVersion: Short =
     if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1
     else 0
+
+  // Visible for testing
+  private[server] val listOffsetRequestVersion: Short =
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 2
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) 1
+    else 0
+
   private val fetchMetadataSupported = brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0
   private val maxWait = brokerConfig.replicaFetchWaitMaxMs
   private val minBytes = brokerConfig.replicaFetchMinBytes
@@ -242,10 +256,10 @@ class ReplicaFetcherThread(name: String,
   private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long):
Long = {
     val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2)
{
         val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long))
-        ListOffsetRequest.Builder.forReplica(1, replicaId).setTargetTimes(partitions.asJava)
+        ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setTargetTimes(partitions.asJava)
       } else {
         val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest,
1))
-        ListOffsetRequest.Builder.forReplica(0, replicaId).setOffsetData(partitions.asJava)
+        ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setOffsetData(partitions.asJava)
       }
     val clientResponse = leaderEndpoint.sendRequest(requestBuilder)
     val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 84004e3..7835c9d 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -51,8 +51,9 @@ object ReplicationQuotaManagerConfig {
 }
 
 trait ReplicaQuota {
+  def record(value: Long): Unit
   def isThrottled(topicPartition: TopicPartition): Boolean
-  def isQuotaExceeded(): Boolean
+  def isQuotaExceeded: Boolean
 }
 
 object Constants {
@@ -99,7 +100,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
     *
     * @return
     */
-  override def isQuotaExceeded(): Boolean = {
+  override def isQuotaExceeded: Boolean = {
     try {
       sensor().checkQuotas()
     } catch {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index ac5b7ed..fbf7740 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -19,12 +19,13 @@ package kafka.server
 import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogManager
 import kafka.cluster.Partition
+import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.EpochEndOffset
 import org.apache.kafka.common.requests.EpochEndOffset._
@@ -46,6 +47,25 @@ class ReplicaFetcherThreadTest {
   private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
 
   @Test
+  def shouldSendLatestRequestVersionsByDefault(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    val config = KafkaConfig.fromProps(props)
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = brokerEndPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = UnboundedQuota,
+      leaderEndpointBlockingSend = None)
+    assertEquals(ApiKeys.FETCH.latestVersion, thread.fetchRequestVersion)
+    assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, thread.offsetForLeaderEpochRequestVersion)
+    assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, thread.listOffsetRequestVersion)
+  }
+
+  @Test
   def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
@@ -108,7 +128,6 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
     val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
@@ -131,14 +150,15 @@ class ReplicaFetcherThreadTest {
     //Expectations
     expect(partition.truncateTo(anyLong(), anyBoolean())).once
 
-    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+    replay(leaderEpochs, replicaManager, logManager, replica)
 
     //Define the offsets for the OffsetsForLeaderEpochResponse
     val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch,
1)).asJava
 
     //Create the fetcher thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
-    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager,
new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager,
+      new Metrics, new SystemTime, UnboundedQuota, Some(mockNetwork))
     thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
 
     //Loop 1
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index c6efca5..66a2c8e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -52,8 +52,8 @@ class ReplicaManagerQuotasTest {
     val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
-    expect(quota.isQuotaExceeded()).andReturn(false).once()
-    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    expect(quota.isQuotaExceeded).andReturn(false).once()
+    expect(quota.isQuotaExceeded).andReturn(true).once()
     replay(quota)
 
     val fetch = replicaManager.readFromLocalLog(
@@ -78,8 +78,8 @@ class ReplicaManagerQuotasTest {
     val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
-    expect(quota.isQuotaExceeded()).andReturn(true).once()
-    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    expect(quota.isQuotaExceeded).andReturn(true).once()
+    expect(quota.isQuotaExceeded).andReturn(true).once()
     replay(quota)
 
     val fetch = replicaManager.readFromLocalLog(
@@ -103,8 +103,8 @@ class ReplicaManagerQuotasTest {
     val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
-    expect(quota.isQuotaExceeded()).andReturn(false).once()
-    expect(quota.isQuotaExceeded()).andReturn(false).once()
+    expect(quota.isQuotaExceeded).andReturn(false).once()
+    expect(quota.isQuotaExceeded).andReturn(false).once()
     replay(quota)
 
     val fetch = replicaManager.readFromLocalLog(
@@ -128,8 +128,8 @@ class ReplicaManagerQuotasTest {
     val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
-    expect(quota.isQuotaExceeded()).andReturn(false).once()
-    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    expect(quota.isQuotaExceeded).andReturn(false).once()
+    expect(quota.isQuotaExceeded).andReturn(true).once()
     replay(quota)
 
     val fetch = replicaManager.readFromLocalLog(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 54b506d..b1edd01 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -50,7 +50,7 @@ class ReplicationQuotaManagerTest {
     quota.updateQuota(new Quota(100, true))
 
     //Quota should not be broken when we start
-    assertFalse(quota.isQuotaExceeded())
+    assertFalse(quota.isQuotaExceeded)
 
     //First window is fixed, so we'll skip it
     time.sleep(1000)
@@ -60,24 +60,24 @@ class ReplicationQuotaManagerTest {
     quota.record(1)
 
     //Then it should not break the quota
-    assertFalse(quota.isQuotaExceeded())
+    assertFalse(quota.isQuotaExceeded)
 
     //When we record half the quota (half way through the window), we still should not break
     quota.record(149) //150B, 1.5s
-    assertFalse(quota.isQuotaExceeded())
+    assertFalse(quota.isQuotaExceeded)
 
     //Add a byte to push over quota
     quota.record(1) //151B, 1.5s
 
     //Then it should break the quota
     assertEquals(151 / 1.5, rate(metrics), 0) //151B, 1.5s
-    assertTrue(quota.isQuotaExceeded())
+    assertTrue(quota.isQuotaExceeded)
 
     //When we sleep for the remaining half the window
     time.sleep(500) //151B, 2s
 
     //Then Our rate should have halved (i.e back down below the quota)
-    assertFalse(quota.isQuotaExceeded())
+    assertFalse(quota.isQuotaExceeded)
     assertEquals(151d / 2, rate(metrics), 0.1) //151B, 2s
 
     //When we sleep for another half a window (now half way through second window)
@@ -86,14 +86,14 @@ class ReplicationQuotaManagerTest {
 
     //Then the rate should be exceeded again
     assertEquals(250 / 2.5, rate(metrics), 0) //250B, 2.5s
-    assertFalse(quota.isQuotaExceeded())
+    assertFalse(quota.isQuotaExceeded)
     quota.record(1)
-    assertTrue(quota.isQuotaExceeded())
+    assertTrue(quota.isQuotaExceeded)
     assertEquals(251 / 2.5, rate(metrics), 0)
 
     //Sleep for 2 more window
     time.sleep(2 * 1000) //so now at 3.5s
-    assertFalse(quota.isQuotaExceeded())
+    assertFalse(quota.isQuotaExceeded)
     assertEquals(251d / 4.5, rate(metrics), 0)
   }
 


Mime
View raw message