From commits-return-9353-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Apr 11 23:19:24 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C03C818064A for ; Wed, 11 Apr 2018 23:19:22 +0200 (CEST) Received: (qmail 8934 invoked by uid 500); 11 Apr 2018 21:19:21 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 8925 invoked by uid 99); 11 Apr 2018 21:19:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Apr 2018 21:19:21 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E2E668081D; Wed, 11 Apr 2018 21:19:20 +0000 (UTC) Date: Wed, 11 Apr 2018 21:19:20 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6058: KIP-222; Add Consumer Group operations to Admin API MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152348156038.3303.1621304681865793001@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 47918f2d79e907f6a6da599ab82a97c169722229 X-Git-Newrev: 6a99da87abff26a749cee4e765d125bec8b6c424 X-Git-Rev: 6a99da87abff26a749cee4e765d125bec8b6c424 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6a99da8 KAFKA-6058: KIP-222; Add Consumer Group operations to Admin API 6a99da8 is described below commit 6a99da87abff26a749cee4e765d125bec8b6c424 Author: Jorge Quilcate Otoya AuthorDate: Wed Apr 11 14:17:46 2018 -0700 KAFKA-6058: KIP-222; Add Consumer Group operations to Admin API KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API Author: Jorge Quilcate Otoya Author: Jorge Esteban Quilcate Otoya Author: Guozhang Wang Reviewers: Colin Patrick McCabe , Guozhang Wang Closes #4454 from jeqo/feature/admin-client-describe-consumer-group --- checkstyle/checkstyle.xml | 4 +- checkstyle/import-control.xml | 2 + .../apache/kafka/clients/admin/AdminClient.java | 79 +++++ .../clients/admin/ConsumerGroupDescription.java | 104 +++++++ .../kafka/clients/admin/ConsumerGroupListing.java | 59 ++++ .../clients/admin/DeleteConsumerGroupsOptions.java | 31 ++ .../clients/admin/DeleteConsumerGroupsResult.java | 41 +++ .../admin/DescribeConsumerGroupsOptions.java | 31 ++ .../admin/DescribeConsumerGroupsResult.java | 47 +++ .../kafka/clients/admin/KafkaAdminClient.java | 336 +++++++++++++++++++++ .../admin/ListConsumerGroupOffsetsOptions.java | 53 ++++ .../admin/ListConsumerGroupOffsetsResult.java | 48 +++ .../clients/admin/ListConsumerGroupsOptions.java | 29 ++ .../clients/admin/ListConsumerGroupsResult.java | 44 +++ .../kafka/clients/admin/MemberAssignment.java | 65 ++++ .../kafka/clients/admin/MemberDescription.java | 98 ++++++ .../java/org/apache/kafka/common/KafkaFuture.java | 9 + .../kafka/clients/admin/KafkaAdminClientTest.java | 192 ++++++++++++ .../kafka/clients/admin/MockAdminClient.java | 20 ++ 19 files changed, 1290 insertions(+), 2 deletions(-) diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index ccab85c..ad85450 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -105,7 +105,7 @@ - + @@ -114,7 +114,7 @@ - + diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 000acc3..192d735 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -164,6 +164,8 @@ + + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 53b77ce..0171b61 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -689,4 +689,83 @@ public abstract class AdminClient implements AutoCloseable { */ public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options); + /** + * Describe some group IDs in the cluster. + * + * @param groupIds The IDs of the groups to describe. + * @param options The options to use when describing the groups. + * @return The DescribeConsumerGroupResult. + */ + public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, + DescribeConsumerGroupsOptions options); + + /** + * Describe some group IDs in the cluster, with the default options. + *

+ * This is a convenience method for + * #{@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with + * default options. See the overload for more details. + * + * @param groupIds The IDs of the groups to describe. + * @return The DescribeConsumerGroupResult. + */ + public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds) { + return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions()); + } + + /** + * List the consumer groups available in the cluster. + * + * @param options The options to use when listing the consumer groups. + * @return The ListGroupsResult. + */ + public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options); + + /** + * List the consumer groups available in the cluster with the default options. + * + * This is a convenience method for #{@link AdminClient#listConsumerGroups(ListConsumerGroupsOptions)} with default options. + * See the overload for more details. + * + * @return The ListGroupsResult. + */ + public ListConsumerGroupsResult listConsumerGroups() { + return listConsumerGroups(new ListConsumerGroupsOptions()); + } + + /** + * List the consumer group offsets available in the cluster. + * + * @param options The options to use when listing the consumer group offsets. + * @return The ListGroupOffsetsResult + */ + public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options); + + /** + * List the consumer group offsets available in the cluster with the default options. + * + * This is a convenience method for #{@link AdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options. + * + * @return The ListGroupOffsetsResult. + */ + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { + return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions()); + } + + /** + * Delete consumer groups from the cluster. + * + * @param options The options to use when deleting a consumer group. + * @return The DeletConsumerGroupResult. + */ + public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options); + + /** + * Delete consumer groups from the cluster with the default options. + * + * @return The DeleteConsumerGroupResult. + */ + public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds) { + return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java new file mode 100644 index 0000000..0bfa8a7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.utils.Utils; + +import java.util.List; + +/** + * A detailed description of a single consumer group in the cluster. + */ +public class ConsumerGroupDescription { + + private final String groupId; + private final boolean isSimpleConsumerGroup; + private final List members; + private final String partitionAssignor; + + /** + * Creates an instance with the specified parameters. + * + * @param groupId The consumer group id + * @param isSimpleConsumerGroup If Consumer Group is simple + * @param members The consumer group members + * @param partitionAssignor The consumer group partition assignor + */ + public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List members, String partitionAssignor) { + this.groupId = groupId; + this.isSimpleConsumerGroup = isSimpleConsumerGroup; + this.members = members; + this.partitionAssignor = partitionAssignor; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConsumerGroupDescription that = (ConsumerGroupDescription) o; + + if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false; + if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false; + if (members != null ? !members.equals(that.members) : that.members != null) return false; + return partitionAssignor != null ? partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == null; + } + + @Override + public int hashCode() { + int result = groupId != null ? groupId.hashCode() : 0; + result = 31 * result + (isSimpleConsumerGroup ? 1 : 0); + result = 31 * result + (members != null ? members.hashCode() : 0); + result = 31 * result + (partitionAssignor != null ? partitionAssignor.hashCode() : 0); + return result; + } + + /** + * The id of the consumer group. + */ + public String groupId() { + return groupId; + } + + /** + * If consumer group is simple or not. + */ + public boolean isSimpleConsumerGroup() { + return isSimpleConsumerGroup; + } + + /** + * A list of the members of the consumer group. + */ + public List members() { + return members; + } + + /** + * The consumer group partition assignor. + */ + public String partitionAssignor() { + return partitionAssignor; + } + + @Override + public String toString() { + return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", members=" + + Utils.join(members, ",") + ", partitionAssignor=" + partitionAssignor + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java new file mode 100644 index 0000000..46da962 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * A listing of a consumer group in the cluster. + */ +public class ConsumerGroupListing { + private final String groupId; + private final boolean isSimpleConsumerGroup; + + /** + * Create an instance with the specified parameters. + * + * @param groupId Group Id + * @param isSimpleConsumerGroup If consumer group is simple or not. + */ + public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) { + this.groupId = groupId; + this.isSimpleConsumerGroup = isSimpleConsumerGroup; + } + + /** + * Consumer Group Id + */ + public String groupId() { + return groupId; + } + + /** + * If Consumer Group is simple or not. + */ + public boolean isSimpleConsumerGroup() { + return isSimpleConsumerGroup; + } + + @Override + public String toString() { + return "(" + + "groupId='" + groupId + '\'' + + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + + ')'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java new file mode 100644 index 0000000..cd505f4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * Options for the {@link AdminClient#deleteConsumerGroups(Collection)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class DeleteConsumerGroupsOptions extends AbstractOptions { + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java new file mode 100644 index 0000000..b4bce26 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + +/** + * The result of the {@link AdminClient#deleteConsumerGroups(Collection)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class DeleteConsumerGroupsResult { + final KafkaFuture>> futures; + + DeleteConsumerGroupsResult(KafkaFuture>> futures) { + this.futures = futures; + } + + public KafkaFuture>> deletedGroups() { + return futures; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java new file mode 100644 index 0000000..7daff1a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * Options for {@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}. + *

+ * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class DescribeConsumerGroupsOptions extends AbstractOptions { +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java new file mode 100644 index 0000000..adde031 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + + +/** + * The result of the {@link KafkaAdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class DescribeConsumerGroupsResult { + + private final KafkaFuture>> futures; + + public DescribeConsumerGroupsResult(KafkaFuture>> futures) { + this.futures = futures; + } + + /** + * Return a map from group name to futures which can be used to check the description of a consumer group. + */ + public KafkaFuture>> describedGroups() { + return futures; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 3ac0e28..50bcfd3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -27,6 +27,9 @@ import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -79,6 +82,8 @@ import org.apache.kafka.common.requests.DeleteAclsRequest; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; +import org.apache.kafka.common.requests.DeleteGroupsRequest; +import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsRequest; import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteTopicsRequest; @@ -97,6 +102,14 @@ import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RenewDelegationTokenRequest; import org.apache.kafka.common.requests.RenewDelegationTokenResponse; +import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ListGroupsRequest; +import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.Resource; import org.apache.kafka.common.requests.ResourceType; import org.apache.kafka.common.security.token.delegation.DelegationToken; @@ -105,9 +118,11 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -2209,4 +2224,325 @@ public class KafkaAdminClient extends AdminClient { return new DescribeDelegationTokenResult(tokensFuture); } + + @Override + public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, + final DescribeConsumerGroupsOptions options) { + final KafkaFutureImpl>> resultFutures = new KafkaFutureImpl<>(); + final Map> consumerGroupFutures = new HashMap<>(groupIds.size()); + final ArrayList groupIdList = new ArrayList<>(); + for (String groupId : groupIds) { + if (!consumerGroupFutures.containsKey(groupId)) { + consumerGroupFutures.put(groupId, new KafkaFutureImpl()); + groupIdList.add(groupId); + } + } + + for (final String groupId : groupIdList) { + + final long nowFindCoordinator = time.milliseconds(); + final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs()); + + runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + + final long nowDescribeConsumerGroups = time.milliseconds(); + + final int nodeId = response.node().id(); + + runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DescribeGroupsRequest.Builder(groupIdList); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + // Handle server responses for particular groupId. + for (Map.Entry> entry : consumerGroupFutures.entrySet()) { + final String groupId = entry.getKey(); + final KafkaFutureImpl future = entry.getValue(); + final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); + final Errors groupError = groupMetadata.error(); + if (groupError != Errors.NONE) { + future.completeExceptionally(groupError.exception()); + continue; + } + + final String protocolType = groupMetadata.protocolType(); + if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { + final List members = groupMetadata.members(); + final List consumers = new ArrayList<>(members.size()); + + for (DescribeGroupsResponse.GroupMember groupMember : members) { + final PartitionAssignor.Assignment assignment = + ConsumerProtocol.deserializeAssignment( + ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); + + final MemberDescription memberDescription = + new MemberDescription( + groupMember.memberId(), + groupMember.clientId(), + groupMember.clientHost(), + new MemberAssignment(assignment.partitions())); + consumers.add(memberDescription); + } + final String protocol = groupMetadata.protocol(); + final ConsumerGroupDescription consumerGroupDescription = + new ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol); + future.complete(consumerGroupDescription); + } + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(consumerGroupFutures.values(), throwable); + } + }, nowDescribeConsumerGroups); + + resultFutures.complete(new HashMap>(consumerGroupFutures)); + } + + @Override + void handleFailure(Throwable throwable) { + resultFutures.completeExceptionally(throwable); + } + }, nowFindCoordinator); + } + + return new DescribeConsumerGroupsResult(resultFutures); + } + + @Override + public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { + //final KafkaFutureImpl>>> nodeAndConsumerGroupListing = new KafkaFutureImpl<>(); + final KafkaFutureImpl> future = new KafkaFutureImpl>(); + + final long nowMetadata = time.milliseconds(); + final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); + + runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(Collections.emptyList(), true); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse metadataResponse = (MetadataResponse) abstractResponse; + + final Map>> futures = new HashMap<>(); + + for (final Node node : metadataResponse.brokers()) { + futures.put(node, new KafkaFutureImpl>()); + } + + future.combine(futures.values().toArray(new KafkaFuture[0])).thenApply( + new KafkaFuture.BaseFunction, Collection>() { + @Override + public Collection apply(Collection v) { + List listings = new ArrayList<>(); + for (Map.Entry>> entry : futures.entrySet()) { + Collection results; + try { + results = entry.getValue().get(); + } catch (Throwable e) { + // This should be unreachable, since the future returned by KafkaFuture#allOf should + // have failed if any Future failed. + throw new KafkaException("ListConsumerGroupsResult#listings(): internal error", e); + } + listings.addAll(results); + } + return listings; + } + }); + + + for (final Map.Entry>> entry : futures.entrySet()) { + final long nowList = time.milliseconds(); + + final int brokerId = entry.getKey().id(); + + runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) { + + private final KafkaFutureImpl> future = entry.getValue(); + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new ListGroupsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; + final List groupsListing = new ArrayList<>(); + for (ListGroupsResponse.Group group : response.groups()) { + if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) { + final String groupId = group.groupId(); + final String protocolType = group.protocolType(); + final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty()); + groupsListing.add(groupListing); + } + } + future.complete(groupsListing); + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }, nowList); + + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }, nowMetadata); + + return new ListConsumerGroupsResult(future); + } + + @Override + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { + final KafkaFutureImpl> groupOffsetListingFuture = new KafkaFutureImpl<>(); + + final long nowFindCoordinator = time.milliseconds(); + final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs()); + + runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + + final long nowListConsumerGroupOffsets = time.milliseconds(); + + final int nodeId = response.node().id(); + + runnable.call(new Call("listConsumerGroupOffsets", deadline, new ConstantNodeIdProvider(nodeId)) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new OffsetFetchRequest.Builder(groupId, options.topicPartitions()); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; + final Map groupOffsetsListing = new HashMap<>(); + for (Map.Entry entry : + response.responseData().entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + final Long offset = entry.getValue().offset; + final String metadata = entry.getValue().metadata; + groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata)); + } + groupOffsetListingFuture.complete(groupOffsetsListing); + } + + @Override + void handleFailure(Throwable throwable) { + groupOffsetListingFuture.completeExceptionally(throwable); + } + }, nowListConsumerGroupOffsets); + } + + @Override + void handleFailure(Throwable throwable) { + groupOffsetListingFuture.completeExceptionally(throwable); + } + }, nowFindCoordinator); + + return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture); + } + + @Override + public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options) { + final KafkaFutureImpl>> deleteConsumerGroupsFuture = new KafkaFutureImpl<>(); + final Map> deleteConsumerGroupFutures = new HashMap<>(groupIds.size()); + final Set groupIdList = new HashSet<>(); + for (String groupId : groupIds) { + if (!deleteConsumerGroupFutures.containsKey(groupId)) { + deleteConsumerGroupFutures.put(groupId, new KafkaFutureImpl()); + groupIdList.add(groupId); + } + } + + for (final String groupId : groupIdList) { + + final long nowFindCoordinator = time.milliseconds(); + final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs()); + + runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + + final long nowDeleteConsumerGroups = time.milliseconds(); + + final int nodeId = response.node().id(); + + runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DeleteGroupsRequest.Builder(Collections.singleton(groupId)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; + // Handle server responses for particular groupId. + for (Map.Entry> entry : deleteConsumerGroupFutures.entrySet()) { + final String groupId = entry.getKey(); + final KafkaFutureImpl future = entry.getValue(); + final Errors groupError = response.get(groupId); + if (groupError != Errors.NONE) { + future.completeExceptionally(groupError.exception()); + continue; + } + + future.complete(null); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable); + } + }, nowDeleteConsumerGroups); + + deleteConsumerGroupsFuture.complete(new HashMap>(deleteConsumerGroupFutures)); + } + + @Override + void handleFailure(Throwable throwable) { + deleteConsumerGroupsFuture.completeExceptionally(throwable); + } + }, nowFindCoordinator); + } + + return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java new file mode 100644 index 0000000..c6434eb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.List; + +/** + * Options for {@link AdminClient#listConsumerGroupOffsets(String)}. + *

+ * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListConsumerGroupOffsetsOptions extends AbstractOptions { + + private List topicPartitions = null; + + /** + * Set the topic partitions to list as part of the result. + * {@code null} includes all topic partitions. + * + * @param topicPartitions List of topic partitions to include + * @return This ListGroupOffsetsOptions + */ + public ListConsumerGroupOffsetsOptions topicPartitions(List topicPartitions) { + this.topicPartitions = topicPartitions; + return this; + } + + /** + * Returns a list of topic partitions to add as part of the result. + */ + public List topicPartitions() { + return topicPartitions; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java new file mode 100644 index 0000000..23657b5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * The result of the {@link AdminClient#listConsumerGroupOffsets(String)} call. + *

+ * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListConsumerGroupOffsetsResult { + + final KafkaFuture> future; + + ListConsumerGroupOffsetsResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Return a future which yields a map of topic partitions to OffsetAndMetadata objects. + */ + public KafkaFuture> partitionsToOffsetAndMetadata() { + return future; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java new file mode 100644 index 0000000..86ca171 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link AdminClient#listConsumerGroups()}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListConsumerGroupsOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java new file mode 100644 index 0000000..c725371 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * The result of the {@link AdminClient#listConsumerGroups()} call. + *

+ * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListConsumerGroupsResult { + private final KafkaFuture> future; + + ListConsumerGroupsResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Return a future which yields a collection of ConsumerGroupListing objects. + */ + public KafkaFuture> listings() { + return future; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java new file mode 100644 index 0000000..bd95813 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; + +import java.util.List; + +/** + * A description of the assignments of a specific group member. + */ +public class MemberAssignment { + private final List topicPartitions; + + /** + * Creates an instance with the specified parameters. + * + * @param topicPartitions List of topic partitions + */ + public MemberAssignment(List topicPartitions) { + this.topicPartitions = topicPartitions; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MemberAssignment that = (MemberAssignment) o; + + return topicPartitions != null ? topicPartitions.equals(that.topicPartitions) : that.topicPartitions == null; + } + + @Override + public int hashCode() { + return topicPartitions != null ? topicPartitions.hashCode() : 0; + } + + /** + * The topic partitions assigned to a group member. + */ + public List topicPartitions() { + return topicPartitions; + } + + @Override + public String toString() { + return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java new file mode 100644 index 0000000..2ba1963 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * A detailed description of a single group instance in the cluster. + */ +public class MemberDescription { + + private final String memberId; + private final String clientId; + private final String host; + private final MemberAssignment assignment; + + /** + * Creates an instance with the specified parameters. + * + * @param memberId The consumer id + * @param clientId The client id + * @param host The host + * @param assignment The assignment + */ + public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) { + this.memberId = memberId; + this.clientId = clientId; + this.host = host; + this.assignment = assignment; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MemberDescription that = (MemberDescription) o; + + if (memberId != null ? !memberId.equals(that.memberId) : that.memberId != null) return false; + if (clientId != null ? !clientId.equals(that.clientId) : that.clientId != null) return false; + return assignment != null ? assignment.equals(that.assignment) : that.assignment == null; + } + + @Override + public int hashCode() { + int result = memberId != null ? memberId.hashCode() : 0; + result = 31 * result + (clientId != null ? clientId.hashCode() : 0); + result = 31 * result + (assignment != null ? assignment.hashCode() : 0); + return result; + } + + /** + * The consumer id of the group member. + */ + public String consumerId() { + return memberId; + } + + /** + * The client id of the group member. + */ + public String clientId() { + return clientId; + } + + /** + * The host where the group member is running. + */ + public String host() { + return host; + } + + /** + * The assignment of the group member. + */ + public MemberAssignment assignment() { + return assignment; + } + + @Override + public String toString() { + return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" + host + ", assignment=" + + assignment + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java index 9cd2e01..4af996c 100644 --- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java +++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java @@ -106,6 +106,15 @@ public abstract class KafkaFuture implements Future { return allOfFuture; } + public KafkaFuture combine(KafkaFuture... futures) { + AllOfAdapter allOfWaiter = new AllOfAdapter<>(futures.length, this); + for (KafkaFuture future : futures) { + future.addWaiter(allOfWaiter); + } + + return this; + } + /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 0d4dee6..a242ed6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -18,6 +18,9 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; @@ -47,10 +50,15 @@ import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; +import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceType; @@ -64,6 +72,7 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -81,6 +90,7 @@ import static org.apache.kafka.common.requests.ResourceType.BROKER; import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -636,6 +646,188 @@ public class KafkaAdminClientTest { } } + //Ignoring test to be fixed on follow-up PR + @Ignore + @Test + public void testListConsumerGroups() throws Exception { + final HashMap nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().setNode(env.cluster().controller()); + + env.kafkaClient().prepareResponse( + new MetadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + new ArrayList())); + + env.kafkaClient().prepareResponse( + new ListGroupsResponse( + Errors.NONE, + Arrays.asList( + new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE), + new ListGroupsResponse.Group("group-connect-1", "connector") + ))); + + final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); + final List consumerGroups = new ArrayList<>(); + + final KafkaFuture> listings = result.listings(); + consumerGroups.addAll(listings.get()); + assertEquals(1, consumerGroups.size()); + } + } + + @Test + public void testDescribeConsumerGroups() throws Exception { + final HashMap nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().setNode(env.cluster().controller()); + + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + final Map groupMetadataMap = new HashMap<>(); + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); + TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); + TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); + + final List topicPartitions = new ArrayList<>(); + topicPartitions.add(0, myTopicPartition0); + topicPartitions.add(1, myTopicPartition1); + topicPartitions.add(2, myTopicPartition2); + + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions)); + + groupMetadataMap.put( + "group-0", + new DescribeGroupsResponse.GroupMetadata( + Errors.NONE, + "", + ConsumerProtocol.PROTOCOL_TYPE, + "", + Arrays.asList( + new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment), + new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment)))); + groupMetadataMap.put( + "group-connect-0", + new DescribeGroupsResponse.GroupMetadata( + Errors.NONE, + "", + "connect", + "", + Arrays.asList( + new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment), + new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment)))); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap)); + + final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0")); + final KafkaFuture groupDescriptionFuture = result.describedGroups().get().get("group-0"); + final ConsumerGroupDescription groupDescription = groupDescriptionFuture.get(); + + assertEquals(1, result.describedGroups().get().size()); + assertEquals("group-0", groupDescription.groupId()); + assertEquals(2, groupDescription.members().size()); + } + } + + @Test + public void testDescribeConsumerGroupOffsets() throws Exception { + final HashMap nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().setNode(env.cluster().controller()); + + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); + TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); + TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); + + final Map responseData = new HashMap<>(); + responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, "", Errors.NONE)); + responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, "", Errors.NONE)); + responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20, "", Errors.NONE)); + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); + + final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0"); + + assertEquals(3, result.partitionsToOffsetAndMetadata().get().size()); + final TopicPartition topicPartition = result.partitionsToOffsetAndMetadata().get().keySet().iterator().next(); + assertEquals("my_topic", topicPartition.topic()); + final OffsetAndMetadata offsetAndMetadata = result.partitionsToOffsetAndMetadata().get().values().iterator().next(); + assertEquals(10, offsetAndMetadata.offset()); + } + } + + @Test + public void testDeleteConsumerGroups() throws Exception { + final HashMap nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final List groupIds = Collections.singletonList("group-0"); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().setNode(env.cluster().controller()); + + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + final Map response = new HashMap<>(); + response.put("group-0", Errors.NONE); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response)); + + final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); + + final Map> results = result.deletedGroups().get(); + assertNull(results.get("group-0").get()); + } + } + private static void assertCollectionIs(Collection collection, T... elements) { for (T element : elements) { assertTrue("Did not find " + element, collection.contains(element)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 0f5df38..2fc7048 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -297,6 +297,26 @@ public class MockAdminClient extends AdminClient { } @Override + public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, DescribeConsumerGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } -- To stop receiving notification emails like this one, please contact guozhang@apache.org.