kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [kafka] dajac commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
Date Thu, 10 Dec 2020 16:02:24 GMT

dajac commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540218952



##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -21,8 +21,12 @@
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   //
-  // Version 3 adds AddingReplicas and RemovingReplicas
-  "validVersions": "0-4",
+  // Version 3 adds AddingReplicas and RemovingReplicas.
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 adds Topic ID to the TopicStates.

Review comment:
       `Type` field is also added. I would also mention the KIP for reference.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions
by topic.

Review comment:
       nit: There are two consecutive `and`.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -31,6 +35,8 @@
       "about": "The current controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default":
"-1",
       "about": "The current broker epoch." },
+    { "name":  "Type", "type":  "int8", "versions": "5+",

Review comment:
       nit: There are two spaces before `Type` and `int8`.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions
by topic.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    {"name":  "Topics", "type":  "[]LeaderAndIsrTopicError", "versions": "5+",

Review comment:
       nit: There are two spaces before `Topics` and `[]LeaderAndIsrTopicError`. We could
also add a space before `name` to remain consistent with the other fields.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable
e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();

Review comment:
       nit: Could we directly allocate the `ArrayList` with the correct capacity? The same
for `partitions` above and below.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable
e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();
+        for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+            LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
+            topicError.setTopicId(topicIds().get(topicState.topicName()));

Review comment:
       `topicIds()` recomputes the `Map` so it would be better to keep a local reference to
it.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
##########
@@ -116,8 +118,13 @@ public void testVersionLogic() {
                 new Node(0, "host0", 9090),
                 new Node(1, "host1", 9091)
             );
+
+            HashMap<String, Uuid> topicIds = new HashMap<>();

Review comment:
       nit: `HashMap<String, Uuid>` to `Map<String, Uuid>`. I have seen this in
a couple of places.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -45,8 +47,16 @@ public LeaderAndIsrResponse(Struct struct, short version) {
         this.data = new LeaderAndIsrResponseData(struct, version);
     }
 
-    public List<LeaderAndIsrPartitionError> partitions() {
-        return data.partitionErrors();
+    public List<LeaderAndIsrTopicError> topics() {
+        return this.data.topics();
+    }
+
+    public Iterable<LeaderAndIsrPartitionError> partitions() {
+        if (data.topics().isEmpty()) {
+            return data.partitionErrors();
+        }
+        return () -> new FlattenedIterator<>(data.topics().iterator(),
+            topic -> topic.partitionErrors().iterator());

Review comment:
       It would be better to explicitly handle the version here instead of relying on `topics()`
to be empty or not. It is easier to reason about for the reader and it also makes the handling
very explicit instead of being implicit.

##########
File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
##########
@@ -482,8 +483,13 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
           _.node(config.interBrokerListenerName)
         }
         val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+        val topicIds = leaderAndIsrPartitionStates.keys
+          .map(_.topic)
+          .toSet
+          .map((topic: String) => (topic, controllerContext.topicIds(topic)))

Review comment:
       nit: `(topic: String)` -> `topic`. We rarely specify the type in lambdas.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
##########
@@ -57,29 +60,32 @@ public void testErrorCountsFromGetErrorResponse() {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        HashMap<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());
+
         LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
-                15, 20, 0, partitionStates, Collections.emptySet()).build();
+                15, 20, 0, partitionStates, topicIds, Collections.emptySet()).build();
         LeaderAndIsrResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
         assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), response.errorCounts());
     }
 
     @Test
     public void testErrorCountsWithTopLevelError() {
-        List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+        Uuid id = Uuid.randomUuid();
+        List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
         LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
             .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
-            .setPartitionErrors(partitions));
+            .setTopics(topics));
         assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), response.errorCounts());

Review comment:
       Should we keep testing the older version as well? Tests assume the newest version only
now.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -72,6 +73,8 @@ class ControllerChannelManagerTest {
     assertEquals(1, updateMetadataRequests.size)
 
     val leaderAndIsrRequest = leaderAndIsrRequests.head
+    val topicIds = leaderAndIsrRequest.topicIds();
+    val topicNames = topicIds.asScala.map{ case (k,v) => (v, k)}

Review comment:
       nit: We usually put a space before `{` and `}` when we use curly braces inline. We
could also add a space after the coma.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
     val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = batch.sentEvents.head
     assertEquals(2, brokerId)
     assertEquals(partitions.keySet,
-      leaderAndIsrResponse.partitions.asScala.map(p => new TopicPartition(p.topicName,
p.partitionIndex)).toSet)
+      leaderAndIsrResponse.topics.asScala.map(t => t.partitionErrors.asScala.map(p =>
+        new TopicPartition(topicNames.get(t.topicId).get, p.partitionIndex))).flatMap(f =>
f).toSet)

Review comment:
       * `topicNames.get(t.topicId).get` -> `topicNames(t.topicId)`. It is a bit more concise
when you know that the Map contains what you are looking up.
   * `flatMap(f => f)` looks weird. I suppose that we could use `flatMap` instead of the
first `map`.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -818,15 +825,18 @@ class ControllerChannelManagerTest {
   private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: List[SentRequest]):
Unit = {
     sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback
!= null).foreach { sentRequest =>
       val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
-      val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p =>
-        new LeaderAndIsrPartitionError()
-          .setTopicName(p.topicName)
-          .setPartitionIndex(p.partitionIndex)
-          .setErrorCode(error.code))
+      val topicIds = leaderAndIsrRequest.topicIds()
+      val topicErrors = leaderAndIsrRequest.data.topicStates().asScala.map(t =>

Review comment:
       nit: The parenthesis after `topicStates`, `partitionStates`, and `partitionIndex` are
not mandatory. We tend to not put it when they are not.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions
by topic.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    {"name":  "Topics", "type":  "[]LeaderAndIsrTopicError", "versions": "5+",
+      "about": "Each topic", "fields": [
+      { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID"
},
+      { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",

Review comment:
       Should the version of `PartitionErrors` be `5+`? 

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
##########
@@ -51,15 +53,15 @@
     public void testUnsupportedVersion() {
         LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
                 (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
-                Collections.emptyList(), Collections.emptySet());
+                Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
         assertThrows(UnsupportedVersionException.class, builder::build);
     }
 
     @Test
     public void testGetErrorResponse() {
         for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion();
version++) {
             LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version,
0, 0, 0,
-                    Collections.emptyList(), Collections.emptySet());
+                    Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());

Review comment:
       Shouldn't we verify that topic ids are correctly set in the generated response as well?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)
             // Minor optimization since the top-level error applies to all partitions
-            return Collections.singletonMap(error, data.partitionErrors().size());
-        return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+            if (data.topics().isEmpty()) {
+                return Collections.singletonMap(error, data.partitionErrors().size());
+            } else {
+                return Collections.singletonMap(error,
+                        data.topics().stream().mapToInt(t -> t.partitionErrors().size()).sum());
+            }
+        if (data.topics().isEmpty()) {
+            return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+        }
+        return errorCounts(data.topics().stream().flatMap(t -> t.partitionErrors().stream()).map(l
->
+                Errors.forCode(l.errorCode())));

Review comment:
       ditto here. It would be better to be explicit wrt. the handling of the version.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -21,6 +21,7 @@
 import org.apache.kafka.common.IsolationLevel;

Review comment:
       It would be good to verify that all versions are tested in `testSerialization`.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
##########
@@ -57,29 +60,32 @@ public void testErrorCountsFromGetErrorResponse() {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        HashMap<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());

Review comment:
       nit: You could use `Collections.singletonMap` here.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val leaderAndIsrRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_4_IV1) 4
+        if (apiVersion >= KAFKA_2_8_IV0) 5

Review comment:
       I wonder if we should extend `testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion`
to verifies the topic ids based on the different supported versions. What do you think?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       Do we need to handle the case when the topic may not be there anymore when the response
is received? If not, we could use `controllerContext.topicNames(topic.topicId)`.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {

Review comment:
       It may be better to also have an explicit handling of the version here. Alternatively,
we could push this into the `LeaderAndIsrResponse` and provides a method `Map<TopicPartition,
...> partitions()` which handles the version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message