kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
Date Mon, 16 Apr 2018 06:23:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439027#comment-16439027
] 

ASF GitHub Bot commented on KAFKA-6788:
---------------------------------------

cyrusv closed pull request #4875: KAFKA-6788: Grouping consumer requests per consumer coordinator
in admin client
URL: https://github.com/apache/kafka/pull/4875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943555b..8fd92572586 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
             }
         }
 
-        // TODO: KAFKA-6788, we should consider grouping the request per coordinator and
send one request with a list of
-        // all consumer groups this coordinator host
+        final Set<String> describedGroupIds = new HashSet<>();
+
         for (final Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>>
entry : futures.entrySet()) {
             // skip sending request for those futures that already failed.
             if (entry.getValue().isCompletedExceptionally())
                 continue;
 
             final String groupId = entry.getKey();
+            if (describedGroupIds.contains(groupId)) {
+                continue;
+            }
 
             final long startFindCoordinatorMs = time.milliseconds();
             final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
@@ -2274,61 +2277,83 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
-                    final long nowDescribeConsumerGroups = time.milliseconds();
+                    final long nowListConsumerGroups = time.milliseconds();
 
                     final int nodeId = response.node().id();
 
-                    runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId))
{
 
+                    runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(nodeId))
{
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+                            return new ListGroupsRequest.Builder();
                         }
 
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
-                            final DescribeGroupsResponse response = (DescribeGroupsResponse)
abstractResponse;
+                            final ListGroupsResponse listResponse = (ListGroupsResponse)
abstractResponse;
+                            final long nowDescribeConsumerGroups = time.milliseconds();
 
-                            KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                            final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
+                            final Set<String> groupIdsToDescribe = new HashSet<>();
+                            for (ListGroupsResponse.Group group : listResponse.groups())
{
+                                groupIdsToDescribe.add(group.groupId());
+                            }
 
-                            final Errors groupError = groupMetadata.error();
-                            if (groupError != Errors.NONE) {
-                                // TODO: KAFKA-6789, we can retry based on the error code
-                                future.completeExceptionally(groupError.exception());
-                            } else {
-                                final String protocolType = groupMetadata.protocolType();
-                                if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) ||
protocolType.isEmpty()) {
-                                    final List<DescribeGroupsResponse.GroupMember>
members = groupMetadata.members();
-                                    final List<MemberDescription> consumers = new ArrayList<>(members.size());
-
-                                    for (DescribeGroupsResponse.GroupMember groupMember :
members) {
-                                        final PartitionAssignor.Assignment assignment =
-                                                ConsumerProtocol.deserializeAssignment(
-                                                        ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
-
-                                        final MemberDescription memberDescription =
-                                                new MemberDescription(
-                                                        groupMember.memberId(),
-                                                        groupMember.clientId(),
-                                                        groupMember.clientHost(),
-                                                        new MemberAssignment(assignment.partitions()));
-                                        consumers.add(memberDescription);
+                            runnable.call(new Call("describeConsumerGroups", deadline, new
ConstantNodeIdProvider(nodeId)) {
+
+                                @Override
+                                AbstractRequest.Builder createRequest(int timeoutMs) {
+                                    return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+                                }
+
+                                @Override
+                                void handleResponse(AbstractResponse abstractResponse) {
+                                    final DescribeGroupsResponse response = (DescribeGroupsResponse)
abstractResponse;
+                                    for (String describedCandidate : groupIdsToDescribe)
{
+                                        if (response.groups().containsKey(describedCandidate))
{
+                                            describedGroupIds.add(describedCandidate);
+                                        }
+                                    }
+
+                                    KafkaFutureImpl<ConsumerGroupDescription> future
= futures.get(groupId);
+                                    final DescribeGroupsResponse.GroupMetadata groupMetadata
= response.groups().get(groupId);
+
+                                    final Errors groupError = groupMetadata.error();
+                                    if (groupError != Errors.NONE) {
+                                        // TODO: KAFKA-6789, we can retry based on the error
code
+                                        future.completeExceptionally(groupError.exception());
+                                    } else {
+                                        final String protocolType = groupMetadata.protocolType();
+                                        if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE)
|| protocolType.isEmpty()) {
+                                            final List<DescribeGroupsResponse.GroupMember>
members = groupMetadata.members();
+                                            final List<MemberDescription> consumers
= new ArrayList<>(members.size());
+
+                                            for (DescribeGroupsResponse.GroupMember groupMember
: members) {
+                                                final PartitionAssignor.Assignment assignment
=
+                                                        ConsumerProtocol.deserializeAssignment(
+                                                                ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
+
+                                                final MemberDescription memberDescription
=
+                                                        new MemberDescription(
+                                                                groupMember.memberId(),
+                                                                groupMember.clientId(),
+                                                                groupMember.clientHost(),
+                                                                new MemberAssignment(assignment.partitions()));
+                                                consumers.add(memberDescription);
+                                            }
+                                            final String protocol = groupMetadata.protocol();
+                                            final ConsumerGroupDescription consumerGroupDescription
=
+                                                    new ConsumerGroupDescription(groupId,
protocolType.isEmpty(), consumers, protocol);
+                                            future.complete(consumerGroupDescription);
+                                        }
                                     }
-                                    final String protocol = groupMetadata.protocol();
-                                    final ConsumerGroupDescription consumerGroupDescription
=
-                                            new ConsumerGroupDescription(groupId, protocolType.isEmpty(),
consumers, protocol);
-                                    future.complete(consumerGroupDescription);
                                 }
-                            }
-                        }
 
-                        @Override
-                        void handleFailure(Throwable throwable) {
-                            KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
-                            future.completeExceptionally(throwable);
-                        }
-                    }, nowDescribeConsumerGroups);
+                                @Override
+                                void handleFailure(Throwable throwable) {
+                                    KafkaFutureImpl<ConsumerGroupDescription> future
= futures.get(groupId);
+                                    future.completeExceptionally(throwable);
+                                }
+                            }, nowDescribeConsumerGroups);
                 }
 
                 @Override
@@ -2548,13 +2573,18 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String>
groupI
             }
         }
 
-        // TODO: KAFKA-6788, we should consider grouping the request per coordinator and
send one request with a list of
-        // all consumer groups this coordinator host
+        final Set<String> deletedGroupIds = new HashSet<>();
+
         for (final String groupId : groupIds) {
             // skip sending request for those futures that already failed.
             if (futures.get(groupId).isCompletedExceptionally())
                 continue;
 
+            // Skip ones we deleted on the same coordinator
+            if (deletedGroupIds.contains(groupId)) {
+                continue;
+            }
+
             final long startFindCoordinatorMs = time.milliseconds();
             final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
 
@@ -2568,29 +2598,60 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String>
groupI
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
 
-                    final long nowDeleteConsumerGroups = time.milliseconds();
-
+                    final long nowListGroups = time.milliseconds();
                     final int nodeId = response.node().id();
 
-                    runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId))
{
-
+                    runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(nodeId))
{
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DeleteGroupsRequest.Builder(Collections.singleton(groupId));
+                            return new ListGroupsRequest.Builder();
                         }
 
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) {
-                            final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
-
-                            KafkaFutureImpl<Void> future = futures.get(groupId);
-                            final Errors groupError = response.get(groupId);
+                            final ListGroupsResponse listResponse = (ListGroupsResponse)
abstractResponse;
+                            final long nowDeleteConsumerGroups = time.milliseconds();
 
-                            if (groupError != Errors.NONE) {
-                                future.completeExceptionally(groupError.exception());
-                            } else {
-                                future.complete(null);
+                            final Set<String> groupIdsToDelete = new HashSet<>();
+                            for (ListGroupsResponse.Group group : listResponse.groups())
{
+                                groupIdsToDelete.add(group.groupId());
                             }
+                            runnable.call(new Call("deleteConsumerGroups", deadline, new
ConstantNodeIdProvider(nodeId)) {
+
+                                @Override
+                                AbstractRequest.Builder createRequest(int timeoutMs) {
+
+
+                                    return new DeleteGroupsRequest.Builder(groupIdsToDelete);
+                                }
+
+                                @Override
+                                void handleResponse(AbstractResponse abstractResponse) {
+                                    final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
+
+                                    // If we submitted it and it wasn't an error
+                                    for (final String delCandidateGroupId : groupIdsToDelete)
{
+                                        if (!response.errors().containsKey(delCandidateGroupId))
{
+                                            deletedGroupIds.add(delCandidateGroupId);
+                                        }
+                                    }
+
+                                    KafkaFutureImpl<Void> future = futures.get(groupId);
+                                    final Errors groupError = response.get(groupId);
+
+                                    if (groupError != Errors.NONE) {
+                                        future.completeExceptionally(groupError.exception());
+                                    } else {
+                                        future.complete(null);
+                                    }
+                                }
+
+                                @Override
+                                void handleFailure(Throwable throwable) {
+                                    KafkaFutureImpl<Void> future = futures.get(groupId);
+                                    future.completeExceptionally(throwable);
+                                }
+                            }, nowDeleteConsumerGroups);
                         }
 
                         @Override
@@ -2598,7 +2659,7 @@ void handleFailure(Throwable throwable) {
                             KafkaFutureImpl<Void> future = futures.get(groupId);
                             future.completeExceptionally(throwable);
                         }
-                    }, nowDeleteConsumerGroups);
+                    }, nowListGroups);
                 }
 
                 @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2789b62621..be44b37496f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -867,6 +867,15 @@ public void testDeleteConsumerGroups() throws Exception {
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
+            env.kafkaClient().prepareResponse(
+                new ListGroupsResponse(
+                    Errors.NONE,
+                    Arrays.asList(
+                        new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
+                        new ListGroupsResponse.Group("group-connect-1", "connector")
+                    )));
+
+
             final Map<String, Errors> response = new HashMap<>();
             response.put("group-0", Errors.NONE);
             env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Grouping consumer requests per consumer coordinator in admin client
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6788
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6788
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: Guozhang Wang
>            Priority: Major
>              Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we will first
try to get the coordinator for each requested group id, and then send the corresponding request
for that group id. However, different group ids could be hosted on the same coordinator, and
these requests do support multi group ids be sent within the same request. So we can consider
optimize it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message