kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6783) consumer poll(timeout) blocked infinitely when no available bootstrap server
Date Thu, 12 Apr 2018 15:11:02 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16435736#comment-16435736 ] 

ASF GitHub Bot commented on KAFKA-6783:
---------------------------------------

koqizhao closed pull request #4860: KAFKA-6783/6784: consumer poll(timeout) blocked infinitely when no available bootstrap server, FindCoordinatorResponse cannot be cast to FetchResponse
URL: https://github.com/apache/kafka/pull/4860
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index f83698050ed..69f560ec38f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -858,6 +858,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/config/*"
     include "**/org/apache/kafka/common/security/auth/*"
     include "**/org/apache/kafka/server/policy/*"
+    include "**/org/apache/kafka/common/security/token/delegation/*"
   }
 }
 
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index ccab85ce4fe..ad8545027a0 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -105,7 +105,7 @@
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->
-      <property name="max" value="20"/>
+      <property name="max" value="25"/>
     </module>
     <module name="BooleanExpressionComplexity">
       <!-- default is 3 -->
@@ -114,7 +114,7 @@
 
     <module name="ClassFanOutComplexity">
       <!-- default is 20 -->
-      <property name="max" value="40"/>
+      <property name="max" value="50"/>
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 000acc38463..192d73557f7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -164,6 +164,8 @@
 
     <subpackage name="admin">
       <allow pkg="org.apache.kafka.clients.admin" />
+      <allow pkg="org.apache.kafka.clients.consumer.internals" />
+      <allow pkg="org.apache.kafka.clients.consumer" />
     </subpackage>
   </subpackage>
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0fec810a95b..2767132886d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+              files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 897e127d557..0171b617e7c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -535,4 +535,237 @@ public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> re
      */
     public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
                                                       DeleteRecordsOptions options);
+
+    /**
+     * <p>Create a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
+     * See the overload for more details.</p>
+     *
+     * @return                      The CreateDelegationTokenResult.
+     */
+    public CreateDelegationTokenResult createDelegationToken() {
+        return createDelegationToken(new CreateDelegationTokenOptions());
+    }
+
+
+    /**
+     * <p>Create a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+     *     if the renewers principal type is not supported.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options               The options to use when creating delegation token.
+     * @return                      The DeleteRecordsResult.
+     */
+    public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
+
+
+    /**
+     * <p>Renew a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
+     * See the overload for more details.</p>
+     *
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @return                      The RenewDelegationTokenResult.
+     */
+    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+        return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+    }
+
+    /**
+     * <p> Renew a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     *     if the delegation token is not found on server.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     *     if the authenticated user is not owner/renewer of the token.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     *     if the delegation token is expired.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @param options               The options to use when renewing delegation token.
+     * @return                      The RenewDelegationTokenResult.
+     */
+    public abstract RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
+
+    /**
+     * <p>Expire a Delegation Token.</p>
+     *
+     * <p>This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
+     * This will expire the token immediately. See the overload for more details.</p>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @return                      The ExpireDelegationTokenResult.
+     */
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+        return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
+    }
+
+    /**
+     * <p>Expire a Delegation Token.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code ExpireDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     *     if the delegation token is not found on server.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     *     if the authenticated user is not owner/renewer of the requested token.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     *     if the delegation token is expired.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac                  HMAC of the Delegation token
+     * @param options               The options to use when expiring delegation token.
+     * @return                      The ExpireDelegationTokenResult.
+     */
+    public abstract ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
+
+    /**
+     *<p>Describe the Delegation Tokens.</p>
+     *
+     * <p>This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
+     * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.</p>
+     *
+     * @return                      The DescribeDelegationTokenResult.
+     */
+    public DescribeDelegationTokenResult describeDelegationToken() {
+        return describeDelegationToken(new DescribeDelegationTokenOptions());
+    }
+
+    /**
+     * <p>Describe the Delegation Tokens.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.1.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@code DescribeDelegationTokenResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *     If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     *     <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     *     if the delegation token feature is disabled.</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options               The options to use when describing delegation tokens.
+     * @return                      The DescribeDelegationTokenResult.
+     */
+    public abstract DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
+
+    /**
+     * Describe some group IDs in the cluster.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @param options  The options to use when describing the groups.
+     * @return The DescribeConsumerGroupResult.
+     */
+    public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
+                                                                        DescribeConsumerGroupsOptions options);
+
+    /**
+     * Describe some group IDs in the cluster, with the default options.
+     * <p>
+     * This is a convenience method for
+     * #{@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)} with
+     * default options. See the overload for more details.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @return The DescribeConsumerGroupResult.
+     */
+    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
+        return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
+    }
+
+    /**
+     * List the consumer groups available in the cluster.
+     *
+     * @param options           The options to use when listing the consumer groups.
+     * @return The ListGroupsResult.
+     */
+    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
+
+    /**
+     * List the consumer groups available in the cluster with the default options.
+     *
+     * This is a convenience method for #{@link AdminClient#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The ListGroupsResult.
+     */
+    public ListConsumerGroupsResult listConsumerGroups() {
+        return listConsumerGroups(new ListConsumerGroupsOptions());
+    }
+
+    /**
+     * List the consumer group offsets available in the cluster.
+     *
+     * @param options           The options to use when listing the consumer group offsets.
+     * @return The ListGroupOffsetsResult
+     */
+    public abstract ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+
+    /**
+     * List the consumer group offsets available in the cluster with the default options.
+     *
+     * This is a convenience method for #{@link AdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+     *
+     * @return The ListGroupOffsetsResult.
+     */
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
+        return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
+    }
+
+    /**
+     * Delete consumer groups from the cluster.
+     *
+     * @param options           The options to use when deleting a consumer group.
+     * @return The DeletConsumerGroupResult.
+     */
+    public abstract DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
+
+    /**
+     * Delete consumer groups from the cluster with the default options.
+     *
+     * @return The DeleteConsumerGroupResult.
+     */
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
+        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
new file mode 100644
index 00000000000..0bfa8a782d5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+/**
+ * A detailed description of a single consumer group in the cluster.
+ */
+public class ConsumerGroupDescription {
+
+    private final String groupId;
+    private final boolean isSimpleConsumerGroup;
+    private final List<MemberDescription> members;
+    private final String partitionAssignor;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param groupId               The consumer group id
+     * @param isSimpleConsumerGroup If Consumer Group is simple
+     * @param members               The consumer group members
+     * @param partitionAssignor     The consumer group partition assignor
+     */
+    public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) {
+        this.groupId = groupId;
+        this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+        this.members = members;
+        this.partitionAssignor = partitionAssignor;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ConsumerGroupDescription that = (ConsumerGroupDescription) o;
+
+        if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
+        if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false;
+        if (members != null ? !members.equals(that.members) : that.members != null) return false;
+        return partitionAssignor != null ? partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = groupId != null ? groupId.hashCode() : 0;
+        result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
+        result = 31 * result + (members != null ? members.hashCode() : 0);
+        result = 31 * result + (partitionAssignor != null ? partitionAssignor.hashCode() : 0);
+        return result;
+    }
+
+    /**
+     * The id of the consumer group.
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * If consumer group is simple or not.
+     */
+    public boolean isSimpleConsumerGroup() {
+        return isSimpleConsumerGroup;
+    }
+
+    /**
+     * A list of the members of the consumer group.
+     */
+    public List<MemberDescription> members() {
+        return members;
+    }
+
+    /**
+     * The consumer group partition assignor.
+     */
+    public String partitionAssignor() {
+        return partitionAssignor;
+    }
+
+    @Override
+    public String toString() {
+        return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", members=" +
+            Utils.join(members, ",") + ", partitionAssignor=" + partitionAssignor + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
new file mode 100644
index 00000000000..46da9628010
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A listing of a consumer group in the cluster.
+ */
+public class ConsumerGroupListing {
+    private final String groupId;
+    private final boolean isSimpleConsumerGroup;
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     * @param isSimpleConsumerGroup If consumer group is simple or not.
+     */
+    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) {
+        this.groupId = groupId;
+        this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+    }
+
+    /**
+     * Consumer Group Id
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * If Consumer Group is simple or not.
+     */
+    public boolean isSimpleConsumerGroup() {
+        return isSimpleConsumerGroup;
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "groupId='" + groupId + '\'' +
+            ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+            ')';
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
new file mode 100644
index 00000000000..1b77b943800
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link AdminClient#createDelegationToken(CreateDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
+    private long maxLifeTimeMs = -1;
+    private List<KafkaPrincipal> renewers =  new LinkedList<>();
+
+    public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers) {
+        this.renewers = renewers;
+        return this;
+    }
+
+    public List<KafkaPrincipal> renewers() {
+        return renewers;
+    }
+
+    public CreateDelegationTokenOptions maxlifeTimeMs(long maxLifeTimeMs) {
+        this.maxLifeTimeMs = maxLifeTimeMs;
+        return this;
+    }
+
+    public long maxlifeTimeMs() {
+        return maxLifeTimeMs;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
new file mode 100644
index 00000000000..043cbe87fef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateDelegationTokenResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link KafkaAdminClient#createDelegationToken(CreateDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class CreateDelegationTokenResult {
+    private final KafkaFuture<DelegationToken> delegationToken;
+
+    CreateDelegationTokenResult(KafkaFuture<DelegationToken> delegationToken) {
+        this.delegationToken = delegationToken;
+    }
+
+    /**
+     * Returns a future which yields a delegation token
+     */
+    public KafkaFuture<DelegationToken> delegationToken() {
+        return delegationToken;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
new file mode 100644
index 00000000000..cd505f4c111
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for the {@link AdminClient#deleteConsumerGroups(Collection)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteConsumerGroupsOptions extends AbstractOptions<DeleteConsumerGroupsOptions> {
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
new file mode 100644
index 00000000000..b4bce264405
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the {@link AdminClient#deleteConsumerGroups(Collection)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteConsumerGroupsResult {
+    final KafkaFuture<Map<String, KafkaFuture<Void>>> futures;
+
+    DeleteConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<Void>>> futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<Map<String, KafkaFuture<Void>>> deletedGroups() {
+        return futures;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
new file mode 100644
index 00000000000..7daff1a483b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeConsumerGroupsOptions extends AbstractOptions<DescribeConsumerGroupsOptions> {
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
new file mode 100644
index 00000000000..adde031b678
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+/**
+ * The result of the {@link KafkaAdminClient#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeConsumerGroupsResult {
+
+    private final KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures;
+
+    public DescribeConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from group name to futures which can be used to check the description of a consumer group.
+     */
+    public KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> describedGroups() {
+        return futures;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
new file mode 100644
index 00000000000..60b99354e35
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+/**
+ * Options for {@link AdminClient#describeDelegationToken(DescribeDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
+    private List<KafkaPrincipal> owners;
+
+    /**
+     * if owners is null, all the user owned tokens and tokens where user have Describe permission
+     * will be returned.
+     * @param owners
+     * @return this instance
+     */
+    public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
+        this.owners = owners;
+        return this;
+    }
+
+    public List<KafkaPrincipal> owners() {
+        return owners;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
new file mode 100644
index 00000000000..7a9d4b9dd97
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeDelegationToken(DescribeDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeDelegationTokenResult {
+    private final KafkaFuture<List<DelegationToken>> delegationTokens;
+
+    DescribeDelegationTokenResult(KafkaFuture<List<DelegationToken>> delegationTokens) {
+        this.delegationTokens = delegationTokens;
+    }
+
+    /**
+     * Returns a future which yields list of delegation tokens
+     */
+    public KafkaFuture<List<DelegationToken>> delegationTokens() {
+        return delegationTokens;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
new file mode 100644
index 00000000000..138cd4e69e4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
+    private long expiryTimePeriodMs = -1L;
+
+    public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs) {
+        this.expiryTimePeriodMs = expiryTimePeriodMs;
+        return this;
+    }
+
+    public long expiryTimePeriodMs() {
+        return expiryTimePeriodMs;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
new file mode 100644
index 00000000000..41782bdcb5c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ExpireDelegationTokenResult {
+    private final KafkaFuture<Long> expiryTimestamp;
+
+    ExpireDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    /**
+     * Returns a future which yields expiry timestamp
+     */
+    public KafkaFuture<Long> expiryTimestamp() {
+        return expiryTimestamp;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 511895354ab..50bcfd38856 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -27,6 +27,9 @@
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -69,6 +72,8 @@
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
@@ -77,6 +82,8 @@
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -85,19 +92,37 @@
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
+import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -2072,4 +2097,452 @@ void handleFailure(Throwable throwable) {
 
         return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
     }
+
+    @Override
+    public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
+        final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                CreateDelegationTokenResponse response = (CreateDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    delegationTokenFuture.completeExceptionally(response.error().exception());
+                } else {
+                    TokenInformation tokenInfo =  new TokenInformation(response.tokenId(), response.owner(),
+                        options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
+                    DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
+                    delegationTokenFuture.complete(token);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                delegationTokenFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new CreateDelegationTokenResult(delegationTokenFuture);
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) {
+        final KafkaFutureImpl<Long>  expiryTimeFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new RenewDelegationTokenRequest.Builder(hmac, options.renewTimePeriodMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                RenewDelegationTokenResponse response = (RenewDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    expiryTimeFuture.completeExceptionally(response.error().exception());
+                } else {
+                    expiryTimeFuture.complete(response.expiryTimestamp());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                expiryTimeFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new RenewDelegationTokenResult(expiryTimeFuture);
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) {
+        final KafkaFutureImpl<Long>  expiryTimeFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                ExpireDelegationTokenResponse response = (ExpireDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    expiryTimeFuture.completeExceptionally(response.error().exception());
+                } else {
+                    expiryTimeFuture.complete(response.expiryTimestamp());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                expiryTimeFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new ExpireDelegationTokenResult(expiryTimeFuture);
+    }
+
+    @Override
+    public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) {
+        final KafkaFutureImpl<List<DelegationToken>>  tokensFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DescribeDelegationTokenRequest.Builder(options.owners());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeDelegationTokenResponse response = (DescribeDelegationTokenResponse) abstractResponse;
+                if (response.hasError()) {
+                    tokensFuture.completeExceptionally(response.error().exception());
+                } else {
+                    tokensFuture.complete(response.tokens());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                tokensFuture.completeExceptionally(throwable);
+            }
+        }, now);
+
+        return new DescribeDelegationTokenResult(tokensFuture);
+    }
+
+    @Override
+    public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
+                                                               final DescribeConsumerGroupsOptions options) {
+        final KafkaFutureImpl<Map<String, KafkaFuture<ConsumerGroupDescription>>> resultFutures = new KafkaFutureImpl<>();
+        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> consumerGroupFutures = new HashMap<>(groupIds.size());
+        final ArrayList<String> groupIdList = new ArrayList<>();
+        for (String groupId : groupIds) {
+            if (!consumerGroupFutures.containsKey(groupId)) {
+                consumerGroupFutures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>());
+                groupIdList.add(groupId);
+            }
+        }
+
+        for (final String groupId : groupIdList) {
+
+            final long nowFindCoordinator = time.milliseconds();
+            final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
+
+            runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
+                @Override
+                AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+
+                    final long nowDescribeConsumerGroups = time.milliseconds();
+
+                    final int nodeId = response.node().id();
+
+                    runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new DescribeGroupsRequest.Builder(groupIdList);
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) {
+                            final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
+                            // Handle server responses for particular groupId.
+                            for (Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : consumerGroupFutures.entrySet()) {
+                                final String groupId = entry.getKey();
+                                final KafkaFutureImpl<ConsumerGroupDescription> future = entry.getValue();
+                                final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId);
+                                final Errors groupError = groupMetadata.error();
+                                if (groupError != Errors.NONE) {
+                                    future.completeExceptionally(groupError.exception());
+                                    continue;
+                                }
+
+                                final String protocolType = groupMetadata.protocolType();
+                                if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
+                                    final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
+                                    final List<MemberDescription> consumers = new ArrayList<>(members.size());
+
+                                    for (DescribeGroupsResponse.GroupMember groupMember : members) {
+                                        final PartitionAssignor.Assignment assignment =
+                                                ConsumerProtocol.deserializeAssignment(
+                                                        ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
+
+                                        final MemberDescription memberDescription =
+                                                new MemberDescription(
+                                                        groupMember.memberId(),
+                                                        groupMember.clientId(),
+                                                        groupMember.clientHost(),
+                                                        new MemberAssignment(assignment.partitions()));
+                                        consumers.add(memberDescription);
+                                    }
+                                    final String protocol = groupMetadata.protocol();
+                                    final ConsumerGroupDescription consumerGroupDescription =
+                                            new ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
+                                    future.complete(consumerGroupDescription);
+                                }
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            completeAllExceptionally(consumerGroupFutures.values(), throwable);
+                        }
+                    }, nowDescribeConsumerGroups);
+
+                    resultFutures.complete(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(consumerGroupFutures));
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    resultFutures.completeExceptionally(throwable);
+                }
+            }, nowFindCoordinator);
+        }
+
+        return new DescribeConsumerGroupsResult(resultFutures);
+    }
+
+    @Override
+    public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
+        //final KafkaFutureImpl<Map<Node, KafkaFuture<Collection<ConsumerGroupListing>>>> nodeAndConsumerGroupListing = new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<Collection<ConsumerGroupListing>>();
+
+        final long nowMetadata = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+
+        runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) {
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
+
+                final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futures = new HashMap<>();
+
+                for (final Node node : metadataResponse.brokers()) {
+                    futures.put(node, new KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                }
+
+                future.combine(futures.values().toArray(new KafkaFuture[0])).thenApply(
+                        new KafkaFuture.BaseFunction<Collection<ConsumerGroupListing>, Collection<ConsumerGroupListing>>() {
+                            @Override
+                            public Collection<ConsumerGroupListing> apply(Collection<ConsumerGroupListing> v) {
+                                List<ConsumerGroupListing> listings = new ArrayList<>();
+                                for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
+                                    Collection<ConsumerGroupListing> results;
+                                    try {
+                                        results = entry.getValue().get();
+                                    } catch (Throwable e) {
+                                        // This should be unreachable, since the future returned by KafkaFuture#allOf should
+                                        // have failed if any Future failed.
+                                        throw new KafkaException("ListConsumerGroupsResult#listings(): internal error", e);
+                                    }
+                                    listings.addAll(results);
+                                }
+                                return listings;
+                            }
+                        });
+
+
+                for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
+                    final long nowList = time.milliseconds();
+
+                    final int brokerId = entry.getKey().id();
+
+                    runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) {
+
+                        private final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new ListGroupsRequest.Builder();
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) {
+                            final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
+                            final List<ConsumerGroupListing> groupsListing = new ArrayList<>();
+                            for (ListGroupsResponse.Group group : response.groups()) {
+                                if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) {
+                                    final String groupId = group.groupId();
+                                    final String protocolType = group.protocolType();
+                                    final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                    groupsListing.add(groupListing);
+                                }
+                            }
+                            future.complete(groupsListing);
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            completeAllExceptionally(futures.values(), throwable);
+                        }
+                    }, nowList);
+
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        }, nowMetadata);
+
+        return new ListConsumerGroupsResult(future);
+    }
+
+    @Override
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) {
+        final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<>();
+
+        final long nowFindCoordinator = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
+
+        runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+
+                final long nowListConsumerGroupOffsets = time.milliseconds();
+
+                final int nodeId = response.node().id();
+
+                runnable.call(new Call("listConsumerGroupOffsets", deadline, new ConstantNodeIdProvider(nodeId)) {
+                    @Override
+                    AbstractRequest.Builder createRequest(int timeoutMs) {
+                        return new OffsetFetchRequest.Builder(groupId, options.topicPartitions());
+                    }
+
+                    @Override
+                    void handleResponse(AbstractResponse abstractResponse) {
+                        final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
+                        final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
+                        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
+                                response.responseData().entrySet()) {
+                            final TopicPartition topicPartition = entry.getKey();
+                            final Long offset = entry.getValue().offset;
+                            final String metadata = entry.getValue().metadata;
+                            groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata));
+                        }
+                        groupOffsetListingFuture.complete(groupOffsetsListing);
+                    }
+
+                    @Override
+                    void handleFailure(Throwable throwable) {
+                        groupOffsetListingFuture.completeExceptionally(throwable);
+                    }
+                }, nowListConsumerGroupOffsets);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                groupOffsetListingFuture.completeExceptionally(throwable);
+            }
+        }, nowFindCoordinator);
+
+        return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
+    }
+
+    @Override
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
+        final KafkaFutureImpl<Map<String, KafkaFuture<Void>>> deleteConsumerGroupsFuture = new KafkaFutureImpl<>();
+        final Map<String, KafkaFutureImpl<Void>> deleteConsumerGroupFutures = new HashMap<>(groupIds.size());
+        final Set<String> groupIdList = new HashSet<>();
+        for (String groupId : groupIds) {
+            if (!deleteConsumerGroupFutures.containsKey(groupId)) {
+                deleteConsumerGroupFutures.put(groupId, new KafkaFutureImpl<Void>());
+                groupIdList.add(groupId);
+            }
+        }
+
+        for (final String groupId : groupIdList) {
+
+            final long nowFindCoordinator = time.milliseconds();
+            final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs());
+
+            runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
+                @Override
+                AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+
+                    final long nowDeleteConsumerGroups = time.milliseconds();
+
+                    final int nodeId = response.node().id();
+
+                    runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new DeleteGroupsRequest.Builder(Collections.singleton(groupId));
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) {
+                            final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
+                            // Handle server responses for particular groupId.
+                            for (Map.Entry<String, KafkaFutureImpl<Void>> entry : deleteConsumerGroupFutures.entrySet()) {
+                                final String groupId = entry.getKey();
+                                final KafkaFutureImpl<Void> future = entry.getValue();
+                                final Errors groupError = response.get(groupId);
+                                if (groupError != Errors.NONE) {
+                                    future.completeExceptionally(groupError.exception());
+                                    continue;
+                                }
+
+                                future.complete(null);
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable);
+                        }
+                    }, nowDeleteConsumerGroups);
+
+                    deleteConsumerGroupsFuture.complete(new HashMap<String, KafkaFuture<Void>>(deleteConsumerGroupFutures));
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    deleteConsumerGroupsFuture.completeExceptionally(throwable);
+                }
+            }, nowFindCoordinator);
+        }
+
+        return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
new file mode 100644
index 00000000000..c6434ebb15c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+
+/**
+ * Options for {@link AdminClient#listConsumerGroupOffsets(String)}.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
+
+    private List<TopicPartition> topicPartitions = null;
+
+    /**
+     * Set the topic partitions to list as part of the result.
+     * {@code null} includes all topic partitions.
+     *
+     * @param topicPartitions List of topic partitions to include
+     * @return This ListGroupOffsetsOptions
+     */
+    public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns a list of topic partitions to add as part of the result.
+     */
+    public List<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
new file mode 100644
index 00000000000..23657b5ad3c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link AdminClient#listConsumerGroupOffsets(String)} call.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsResult {
+
+    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+
+    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
+     */
+    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
+        return future;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
new file mode 100644
index 00000000000..86ca171b726
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#listConsumerGroups()}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
new file mode 100644
index 00000000000..c7253710440
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The result of the {@link AdminClient#listConsumerGroups()} call.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupsResult {
+    private final KafkaFuture<Collection<ConsumerGroupListing>> future;
+
+    ListConsumerGroupsResult(KafkaFuture<Collection<ConsumerGroupListing>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a collection of ConsumerGroupListing objects.
+     */
+    public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
+        return future;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
new file mode 100644
index 00000000000..bd958132b7c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+/**
+ * A description of the assignments of a specific group member.
+ */
+public class MemberAssignment {
+    private final List<TopicPartition> topicPartitions;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param topicPartitions List of topic partitions
+     */
+    public MemberAssignment(List<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MemberAssignment that = (MemberAssignment) o;
+
+        return topicPartitions != null ? topicPartitions.equals(that.topicPartitions) : that.topicPartitions == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return topicPartitions != null ? topicPartitions.hashCode() : 0;
+    }
+
+    /**
+     * The topic partitions assigned to a group member.
+     */
+    public List<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public String toString() {
+        return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
new file mode 100644
index 00000000000..2ba19634208
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A detailed description of a single group instance in the cluster.
+ */
+public class MemberDescription {
+
+    private final String memberId;
+    private final String clientId;
+    private final String host;
+    private final MemberAssignment assignment;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param memberId The consumer id
+     * @param clientId   The client id
+     * @param host       The host
+     * @param assignment The assignment
+     */
+    public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) {
+        this.memberId = memberId;
+        this.clientId = clientId;
+        this.host = host;
+        this.assignment = assignment;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MemberDescription that = (MemberDescription) o;
+
+        if (memberId != null ? !memberId.equals(that.memberId) : that.memberId != null) return false;
+        if (clientId != null ? !clientId.equals(that.clientId) : that.clientId != null) return false;
+        return assignment != null ? assignment.equals(that.assignment) : that.assignment == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = memberId != null ? memberId.hashCode() : 0;
+        result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
+        result = 31 * result + (assignment != null ? assignment.hashCode() : 0);
+        return result;
+    }
+
+    /**
+     * The consumer id of the group member.
+     */
+    public String consumerId() {
+        return memberId;
+    }
+
+    /**
+     * The client id of the group member.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * The host where the group member is running.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * The assignment of the group member.
+     */
+    public MemberAssignment assignment() {
+        return assignment;
+    }
+
+    @Override
+    public String toString() {
+        return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" + host + ", assignment=" +
+            assignment + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
new file mode 100644
index 00000000000..238dc4a3494
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#renewDelegationToken(byte[], RenewDelegationTokenOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RenewDelegationTokenOptions extends AbstractOptions<RenewDelegationTokenOptions> {
+    private long renewTimePeriodMs = -1;
+
+    public RenewDelegationTokenOptions renewTimePeriodMs(long renewTimePeriodMs) {
+        this.renewTimePeriodMs = renewTimePeriodMs;
+        return this;
+    }
+
+    public long renewTimePeriodMs() {
+        return renewTimePeriodMs;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
new file mode 100644
index 00000000000..38cdf1ae1b2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RenewDelegationTokenResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The result of the {@link KafkaAdminClient#expireDelegationToken(byte[], ExpireDelegationTokenOptions)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RenewDelegationTokenResult {
+    private final KafkaFuture<Long> expiryTimestamp;
+
+    RenewDelegationTokenResult(KafkaFuture<Long> expiryTimestamp) {
+        this.expiryTimestamp = expiryTimestamp;
+    }
+
+    /**
+     * Returns a future which yields expiry timestamp
+     */
+    public KafkaFuture<Long> expiryTimestamp() {
+        return expiryTimestamp;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index dd4bb7038f3..82b6f69868f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -308,12 +308,15 @@ protected synchronized long timeToNextHeartbeat(long now) {
     /**
      * Ensure that the group is active (i.e. joined and synced)
      */
-    public void ensureActiveGroup() {
+    public void ensureActiveGroup(long now, long remainingMs) {
         // always ensure that the coordinator is ready because we may have been disconnected
         // when sending heartbeats and does not necessarily require us to rejoin the group.
-        ensureCoordinatorReady();
+        ensureCoordinatorReady(now, remainingMs);
         startHeartbeatThreadIfNeeded();
-        joinGroupIfNeeded();
+
+        remainingMs = Math.max(0, remainingMs - (time.milliseconds() - now));
+        now = time.milliseconds();
+        joinGroupIfNeeded(now, remainingMs);
     }
 
     private synchronized void startHeartbeatThreadIfNeeded() {
@@ -346,9 +349,9 @@ private void closeHeartbeatThread() {
     }
 
     // visible for testing. Joins the group without starting the heartbeat thread.
-    void joinGroupIfNeeded() {
+    void joinGroupIfNeeded(long now, long remainingMs) {
         while (needRejoin() || rejoinIncomplete()) {
-            ensureCoordinatorReady();
+            ensureCoordinatorReady(now, remainingMs);
 
             // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
             // time if the client is woken up before a pending rebalance completes. This must be called
@@ -375,12 +378,18 @@ void joinGroupIfNeeded() {
                 RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||
-                        exception instanceof IllegalGenerationException)
-                    continue;
-                else if (!future.isRetriable())
+                        exception instanceof IllegalGenerationException) {
+
+                } else if (!future.isRetriable())
                     throw exception;
-                time.sleep(retryBackoffMs);
+                else
+                    time.sleep(retryBackoffMs);
             }
+
+            remainingMs = Math.max(0, remainingMs - (time.milliseconds() - now));
+            if (remainingMs == 0)
+                break;
+            now = time.milliseconds();
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c966d54..fcd26010061 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -276,7 +276,9 @@ public void poll(long now, long remainingMs) {
 
         if (subscriptions.partitionsAutoAssigned()) {
             if (coordinatorUnknown()) {
-                ensureCoordinatorReady();
+                ensureCoordinatorReady(now, remainingMs);
+
+                remainingMs = Math.max(0, remainingMs - (time.milliseconds() - now));
                 now = time.milliseconds();
             }
 
@@ -287,7 +289,7 @@ public void poll(long now, long remainingMs) {
                 if (subscriptions.hasPatternSubscription())
                     client.ensureFreshMetadata();
 
-                ensureActiveGroup();
+                ensureActiveGroup(now, remainingMs);
                 now = time.milliseconds();
             }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6d8fb6c2466..01e25196dfc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -203,6 +203,10 @@ public int sendFetches() {
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
                         public void onSuccess(ClientResponse resp) {
+                            sensors.fetchLatency.record(resp.requestLatencyMs());
+                            if (!(resp.responseBody() instanceof FetchResponse))
+                                return;
+
                             FetchResponse response = (FetchResponse) resp.responseBody();
                             FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
                             if (handler == null) {
@@ -227,8 +231,6 @@ public void onSuccess(ClientResponse resp) {
                                 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                         resp.requestHeader().apiVersion()));
                             }
-
-                            sensors.fetchLatency.record(resp.requestLatencyMs());
                         }
 
                         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01dc42..4af996cb6b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -106,6 +106,15 @@ private void maybeComplete() {
         return allOfFuture;
     }
 
+    public KafkaFuture<T> combine(KafkaFuture<?>... futures) {
+        AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, this);
+        for (KafkaFuture<?> future : futures) {
+            future.addWaiter(allOfWaiter);
+        }
+
+        return this;
+    }
+
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is executed with this
      * futures's result as the argument to the supplied function.
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 06c8c7f362b..22f273d9d93 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -24,8 +24,11 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -40,7 +43,7 @@
     private final String name;
     private final Sensor[] parents;
     private final List<Stat> stats;
-    private final List<KafkaMetric> metrics;
+    private final Map<MetricName, KafkaMetric> metrics;
     private final MetricConfig config;
     private final Time time;
     private volatile long lastRecordTime;
@@ -103,7 +106,7 @@ public boolean shouldRecord(final int configId) {
         this.registry = registry;
         this.name = Utils.notNull(name);
         this.parents = parents == null ? new Sensor[0] : parents;
-        this.metrics = new ArrayList<>();
+        this.metrics = new LinkedHashMap<>();
         this.stats = new ArrayList<>();
         this.config = config;
         this.time = time;
@@ -190,7 +193,7 @@ public void checkQuotas() {
     }
 
     public void checkQuotas(long timeMs) {
-        for (KafkaMetric metric : this.metrics) {
+        for (KafkaMetric metric : this.metrics.values()) {
             MetricConfig config = metric.config();
             if (config != null) {
                 Quota quota = config.quota();
@@ -228,9 +231,11 @@ public synchronized boolean add(CompoundStat stat, MetricConfig config) {
         this.stats.add(Utils.notNull(stat));
         Object lock = new Object();
         for (NamedMeasurable m : stat.stats()) {
-            KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
-            this.registry.registerMetric(metric);
-            this.metrics.add(metric);
+            final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
+            if (!metrics.containsKey(metric.metricName())) {
+                registry.registerMetric(metric);
+                metrics.put(metric.metricName(), metric);
+            }
         }
         return true;
     }
@@ -247,24 +252,30 @@ public boolean add(MetricName metricName, MeasurableStat stat) {
 
     /**
      * Register a metric with this sensor
+     *
      * @param metricName The name of the metric
-     * @param stat The statistic to keep
-     * @param config A special configuration for this metric. If null use the sensor default configuration.
+     * @param stat       The statistic to keep
+     * @param config     A special configuration for this metric. If null use the sensor default configuration.
      * @return true if metric is added to sensor, false if sensor is expired
      */
-    public synchronized boolean add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
-        if (hasExpired())
+    public synchronized boolean add(final MetricName metricName, final MeasurableStat stat, final MetricConfig config) {
+        if (hasExpired()) {
             return false;
-
-        KafkaMetric metric = new KafkaMetric(new Object(),
-                                             Utils.notNull(metricName),
-                                             Utils.notNull(stat),
-                                             config == null ? this.config : config,
-                                             time);
-        this.registry.registerMetric(metric);
-        this.metrics.add(metric);
-        this.stats.add(stat);
-        return true;
+        } else if (metrics.containsKey(metricName)) {
+            return true;
+        } else {
+            final KafkaMetric metric = new KafkaMetric(
+                new Object(),
+                Utils.notNull(metricName),
+                Utils.notNull(stat),
+                config == null ? this.config : config,
+                time
+            );
+            registry.registerMetric(metric);
+            metrics.put(metric.metricName(), metric);
+            stats.add(stat);
+            return true;
+        }
     }
 
     /**
@@ -276,6 +287,6 @@ public boolean hasExpired() {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(this.metrics);
+        return Collections.unmodifiableList(new LinkedList<>(this.metrics.values()));
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 80ccb7e1382..078d844e85f 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.Collections;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 5502164563b..3985c7e97fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -41,7 +41,7 @@
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
 import org.apache.kafka.common.security.scram.internal.ScramServerCallbackHandler;
 import org.apache.kafka.common.security.ssl.SslFactory;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 import org.apache.kafka.common.utils.Java;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index dba29eafe99..7ba270a6153 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -184,4 +184,8 @@ public Errors error() {
     public List<DelegationToken> tokens() {
         return tokens;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 0d43440d329..40f0aadc0bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public long expiryTimePeriod() {
         private final ByteBuffer hmac;
         private final long expiryTimePeriod;
 
-        public Builder(ByteBuffer hmac, long expiryTimePeriod) {
+        public Builder(byte[] hmac, long expiryTimePeriod) {
             super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
-            this.hmac = hmac;
+            this.hmac = ByteBuffer.wrap(hmac);
             this.expiryTimePeriod = expiryTimePeriod;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
index f7e0ec44168..1a673bc7020 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
@@ -93,4 +93,8 @@ protected Struct toStruct(short version) {
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 4a4b762a72a..a65c705fb66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -88,9 +88,9 @@ public long renewTimePeriod() {
         private final ByteBuffer hmac;
         private final long renewTimePeriod;
 
-        public Builder(ByteBuffer hmac, long renewTimePeriod) {
+        public Builder(byte[] hmac, long renewTimePeriod) {
             super(ApiKeys.RENEW_DELEGATION_TOKEN);
-            this.hmac = hmac;
+            this.hmac = ByteBuffer.wrap(hmac);
             this.renewTimePeriod = renewTimePeriod;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
index 1885b480a96..3233f5c1d7b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
@@ -93,4 +93,8 @@ public Errors error() {
     public long expiryTimestamp() {
         return expiryTimestamp;
     }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
index deee0b8fb33..60857279f59 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramSaslServer.java
@@ -40,7 +40,7 @@
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
index 377aa3d3df5..9a3f0dc66b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramServerCallbackHandler.java
@@ -29,8 +29,8 @@
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.ScramCredentialCallback;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCredentialCallback;
 
 public class ScramServerCallbackHandler implements AuthenticateCallbackHandler {
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
index 05ccbda2fe6..e1f97c1b72d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -16,11 +16,16 @@
  */
 package org.apache.kafka.common.security.token.delegation;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Base64;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+/**
+ * A class representing a delegation token.
+ *
+ */
+@InterfaceStability.Evolving
 public class DelegationToken {
     private TokenInformation tokenInformation;
     private byte[] hmac;
@@ -42,10 +47,6 @@ public String hmacAsBase64String() {
         return Base64.encoder().encodeToString(hmac);
     }
 
-    public ByteBuffer hmacBuffer() {
-        return ByteBuffer.wrap(hmac);
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
index 1d500d21eef..ffd2af3f1c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -16,11 +16,17 @@
  */
 package org.apache.kafka.common.security.token.delegation;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
+/**
+ * A class representing a delegation token details.
+ *
+ */
+@InterfaceStability.Evolving
 public class TokenInformation {
 
     private KafkaPrincipal owner;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
similarity index 95%
rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
index adea210e678..c05b7350b41 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCache.java
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
 
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
 import org.apache.kafka.common.security.scram.internal.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.internal.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
 
 import java.util.Collection;
 import java.util.HashMap;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
similarity index 94%
rename from clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
rename to clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
index 7490a3e91b1..294d7b1d3a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internal/DelegationTokenCredentialCallback.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.security.token.delegation;
+package org.apache.kafka.common.security.token.delegation.internal;
 
 import org.apache.kafka.common.security.scram.ScramCredentialCallback;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0d4dee65c1c..a242ed66060 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -18,6 +18,9 @@
 
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
@@ -47,10 +50,15 @@
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -64,6 +72,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -81,6 +90,7 @@
 import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -636,6 +646,188 @@ public void testDeleteRecords() throws Exception {
         }
     }
 
+    //Ignoring test to be fixed on follow-up PR
+    @Ignore
+    @Test
+    public void testListConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(
+                new MetadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    new ArrayList<MetadataResponse.TopicMetadata>()));
+
+            env.kafkaClient().prepareResponse(
+                new ListGroupsResponse(
+                    Errors.NONE,
+                    Arrays.asList(
+                        new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
+                        new ListGroupsResponse.Group("group-connect-1", "connector")
+                    )));
+
+            final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
+            final List<ConsumerGroupListing> consumerGroups = new ArrayList<>();
+
+            final KafkaFuture<Collection<ConsumerGroupListing>> listings = result.listings();
+            consumerGroups.addAll(listings.get());
+            assertEquals(1, consumerGroups.size());
+        }
+    }
+
+    @Test
+    public void testDescribeConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            final Map<String, DescribeGroupsResponse.GroupMetadata> groupMetadataMap = new HashMap<>();
+            TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+            TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+            TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+
+            final List<TopicPartition> topicPartitions = new ArrayList<>();
+            topicPartitions.add(0, myTopicPartition0);
+            topicPartitions.add(1, myTopicPartition1);
+            topicPartitions.add(2, myTopicPartition2);
+
+            final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+
+            groupMetadataMap.put(
+                "group-0",
+                new DescribeGroupsResponse.GroupMetadata(
+                    Errors.NONE,
+                    "",
+                    ConsumerProtocol.PROTOCOL_TYPE,
+                    "",
+                    Arrays.asList(
+                        new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
+                        new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
+            groupMetadataMap.put(
+                "group-connect-0",
+                new DescribeGroupsResponse.GroupMetadata(
+                    Errors.NONE,
+                    "",
+                    "connect",
+                    "",
+                    Arrays.asList(
+                        new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment),
+                        new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment))));
+
+            env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap));
+
+            final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
+            final KafkaFuture<ConsumerGroupDescription> groupDescriptionFuture = result.describedGroups().get().get("group-0");
+            final ConsumerGroupDescription groupDescription = groupDescriptionFuture.get();
+
+            assertEquals(1, result.describedGroups().get().size());
+            assertEquals("group-0", groupDescription.groupId());
+            assertEquals(2, groupDescription.members().size());
+        }
+    }
+
+    @Test
+    public void testDescribeConsumerGroupOffsets() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+            TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+            TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+
+            final Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
+            responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, "", Errors.NONE));
+            responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, "", Errors.NONE));
+            responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20, "", Errors.NONE));
+            env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
+
+            final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0");
+
+            assertEquals(3, result.partitionsToOffsetAndMetadata().get().size());
+            final TopicPartition topicPartition = result.partitionsToOffsetAndMetadata().get().keySet().iterator().next();
+            assertEquals("my_topic", topicPartition.topic());
+            final OffsetAndMetadata offsetAndMetadata = result.partitionsToOffsetAndMetadata().get().values().iterator().next();
+            assertEquals(10, offsetAndMetadata.offset());
+        }
+    }
+
+    @Test
+    public void testDeleteConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final List<String> groupIds = Collections.singletonList("group-0");
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            final Map<String, Errors> response = new HashMap<>();
+            response.put("group-0", Errors.NONE);
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response));
+
+            final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
+
+            final Map<String, KafkaFuture<Void>> results = result.deletedGroups().get();
+            assertNull(results.get("group-0").get());
+        }
+    }
+
     private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
         for (T element : elements) {
             assertTrue("Did not find " + element, collection.contains(element));
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c141a8acac9..2fc7048b759 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -276,6 +276,46 @@ public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> re
         }
     }
 
+    @Override
+    public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8c147a58f77..52d1579a51f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1191,6 +1191,37 @@ public void testPollWithEmptyUserAssignment() {
         }
     }
 
+    @Test
+    public void testPollWithAllBootstrapServersDown() throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            final long pollTimeout = 1000;
+            final AtomicBoolean pollComplete = new AtomicBoolean();
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    Properties props = new Properties();
+                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
+                    try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
+                        consumer.subscribe(Arrays.asList(topic));
+                        try {
+                            consumer.poll(pollTimeout);
+                        } catch (Exception ex) {
+                            ex.printStackTrace();
+                        } finally {
+                            pollComplete.set(true);
+                        }
+                    }
+                }
+            });
+
+            Thread.sleep(pollTimeout * 2);
+            Assert.assertTrue("poll timeout not work when all servers down", pollComplete.get());
+        } finally {
+            executor.shutdown();
+        }
+    }
+
     @Test
     public void testGracefulClose() throws Exception {
         Map<TopicPartition, Errors> response = new HashMap<>();
@@ -1306,7 +1337,7 @@ public boolean matches(final AbstractRequest body) {
         }, fetchResponse(tp0, 1, 1), node);
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
-        final ConsumerRecords<String, String> records = consumer.poll(0);
+        final ConsumerRecords<String, String> records = consumer.poll(200);
         assertFalse(records.isEmpty());
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
@@ -1336,7 +1367,7 @@ private void consumerCloseTest(final long closeTimeoutMs,
         // Poll with responses
         client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
         client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
-        consumer.poll(0);
+        consumer.poll(200);
 
         // Initiate close() after a commit request on another thread.
         // Kafka consumer is single-threaded, but the implementation allows calls on a
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 1c88803e26c..2fecfe5ea6f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -129,7 +129,7 @@ public boolean matches(AbstractRequest body) {
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             mockTime.sleep(HEARTBEAT_INTERVAL_MS);
             long startMs = System.currentTimeMillis();
             while (System.currentTimeMillis() - startMs < 1000) {
@@ -150,7 +150,7 @@ public void testPollHeartbeatAwakesHeartbeatThread() throws Exception {
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         final CountDownLatch heartbeatDone = new CountDownLatch(1);
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -208,7 +208,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -217,7 +217,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -246,7 +246,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -257,7 +257,7 @@ public boolean matches(AbstractRequest body) {
 
         // the join group completes in this poll()
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -284,7 +284,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -293,7 +293,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -320,7 +320,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -331,7 +331,7 @@ public boolean matches(AbstractRequest body) {
 
         // the join group completes in this poll()
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -360,7 +360,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -369,7 +369,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -398,7 +398,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -409,7 +409,7 @@ public boolean matches(AbstractRequest body) {
 
         // the join group completes in this poll()
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -436,7 +436,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -445,7 +445,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -472,7 +472,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -481,7 +481,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -500,7 +500,7 @@ public void testWakeupInOnJoinComplete() throws Exception {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -512,7 +512,7 @@ public void testWakeupInOnJoinComplete() throws Exception {
         // the join group completes in this poll()
         coordinator.wakeupOnJoinComplete = false;
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3e3c423a428..83912bdfe4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -566,7 +566,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -605,7 +605,7 @@ public boolean matches(AbstractRequest body) {
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(2, subscriptions.assignedPartitions().size());
@@ -672,7 +672,7 @@ public void testUnexpectedErrorOnSyncGroup() {
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
     }
 
     @Test
@@ -698,7 +698,7 @@ public boolean matches(AbstractRequest body) {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -721,7 +721,7 @@ public void testRebalanceInProgressOnSyncGroup() {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -750,7 +750,7 @@ public boolean matches(AbstractRequest body) {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -937,7 +937,7 @@ public void testRejoinGroup() {
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(singleton(t1p), rebalanceListener.revoked);
@@ -957,7 +957,7 @@ public void testDisconnectInJoin() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -975,7 +975,7 @@ public void testInvalidSessionTimeout() {
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
     }
 
     @Test
@@ -1132,7 +1132,7 @@ public void testAutoCommitDynamicAssignmentRebalance() {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         subscriptions.seek(t1p, 100);
 
@@ -1574,7 +1574,7 @@ public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure(
         client.authenticationFailed(node, 300);
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Expected an authentication error.");
         } catch (AuthenticationException e) {
             // OK
@@ -1584,7 +1584,7 @@ public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure(
         assertTrue(client.connectionFailed(node));
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Expected an authentication error.");
         } catch (AuthenticationException e) {
             // OK
@@ -1703,7 +1703,7 @@ public void testCloseNoWait() throws Exception {
     public void testHeartbeatThreadClose() throws Exception {
         groupId = "testCloseTimeoutWithHeartbeatThread";
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
         time.sleep(heartbeatIntervalMs + 100);
         Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
@@ -1744,7 +1744,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-            coordinator.joinGroupIfNeeded();
+            coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
         } else
             subscriptions.assignFromUser(singleton(t1p));
 
@@ -1911,7 +1911,7 @@ private void joinAsFollowerAndReceiveAssignment(String consumerId,
         coordinator.ensureCoordinatorReady();
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(assignment, Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
     }
 
     private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 3f7551ef985..a1c0814c343 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -16,23 +16,25 @@
  */
 package org.apache.kafka.common.metrics;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class SensorTest {
     @Test
     public void testRecordLevelEnum() {
@@ -94,4 +96,32 @@ public void testExpiredSensor() {
 
         metrics.close();
     }
+
+    @Test
+    public void testIdempotentAdd() {
+        final Metrics metrics = new Metrics();
+        final Sensor sensor = metrics.sensor("sensor");
+
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));
+
+        // adding the same metric to the same sensor is a no-op
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg()));
+
+
+        // but adding the same metric to a DIFFERENT sensor is an error
+        final Sensor anotherSensor = metrics.sensor("another-sensor");
+        try {
+            anotherSensor.add(metrics.metricName("test-metric", "test-group"), new Avg());
+            fail("should have thrown");
+        } catch (final IllegalArgumentException ignored) {
+            // pass
+        }
+
+        // note that adding a different metric with the same name is also a no-op
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum()));
+
+        // so after all this, we still just have the original metric registered
+        assertEquals(1, sensor.metrics().size());
+        assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass());
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index fab8e934d8e..68979a1e01f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -44,7 +44,7 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index bdbd1062685..c63cecdda28 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1223,7 +1223,7 @@ private CreateDelegationTokenResponse createCreateTokenResponse() {
     }
 
     private RenewDelegationTokenRequest createRenewTokenRequest() {
-        return new RenewDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+        return new RenewDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
     }
 
     private RenewDelegationTokenResponse createRenewTokenResponse() {
@@ -1231,7 +1231,7 @@ private RenewDelegationTokenResponse createRenewTokenResponse() {
     }
 
     private ExpireDelegationTokenRequest createExpireTokenRequest() {
-        return new ExpireDelegationTokenRequest.Builder(ByteBuffer.wrap("test".getBytes()), System.currentTimeMillis()).build();
+        return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
     }
 
     private ExpireDelegationTokenResponse createExpireTokenResponse() {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
index 3c4b82d7921..f6e43f9edcf 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramSaslServerTest.java
@@ -23,7 +23,7 @@
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache;
 
 import org.junit.Before;
 import org.junit.Test;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 60407c1d0dc..9de9d6d2170 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -107,7 +107,7 @@ public void poll(long timeout) {
         // poll for io until the timeout expires
         final long start = time.milliseconds();
         long now = start;
-        long remaining;
+        long remaining = timeout;
 
         do {
             if (coordinatorUnknown()) {
@@ -116,7 +116,7 @@ public void poll(long timeout) {
             }
 
             if (needRejoin()) {
-                ensureActiveGroup();
+                ensureActiveGroup(now, remaining);
                 now = time.milliseconds();
             }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 60bd863c33e..25395f97cc6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -20,11 +20,10 @@
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
-import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -145,21 +144,21 @@ public void testMetricGroupIdWithoutTags() {
 
     @Test
     public void testRecreateWithClose() {
-        int numMetrics = addToGroup(metrics, false);
-        int numMetricsInRecreatedGroup = addToGroup(metrics, true);
-        Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
+        final Sensor originalSensor = addToGroup(metrics, false);
+        final Sensor recreatedSensor = addToGroup(metrics, true);
+        // because we closed the metricGroup, we get a brand-new sensor
+        assertNotSame(originalSensor, recreatedSensor);
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void testRecreateWithoutClose() {
-        int numMetrics = addToGroup(metrics, false);
-        int numMetricsInRecreatedGroup = addToGroup(metrics, false);
-        // we should never get here
-        throw new RuntimeException("Created " + numMetricsInRecreatedGroup
-                + " metrics in recreated group. Original=" + numMetrics);
+        final Sensor originalSensor = addToGroup(metrics, false);
+        final Sensor recreatedSensor = addToGroup(metrics, false);
+        // since we didn't close the group, the second addToGroup is idempotent
+        assertSame(originalSensor, recreatedSensor);
     }
 
-    private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
+    private Sensor addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
         ConnectMetricsRegistry registry = connectMetrics.registry();
         ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
                 registry.connectorTagName(), "conn_name");
@@ -172,7 +171,7 @@ private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
         sensor.add(metricName("x1"), new Max());
         sensor.add(metricName("y2"), new Avg());
 
-        return metricGroup.metrics().metrics().size();
+        return sensor;
     }
 
     static MetricName metricName(String name) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 154b1df5e73..59f4e2a5fed 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -225,7 +225,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(0, rebalanceListener.revokedCount);
@@ -262,7 +262,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(0, rebalanceListener.revokedCount);
@@ -307,7 +307,7 @@ public boolean matches(AbstractRequest body) {
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
         client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
                 Collections.<String>emptyList(), Collections.singletonList(taskId1x0), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         PowerMock.verifyAll();
     }
@@ -326,7 +326,7 @@ public void testRejoinGroup() {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -340,7 +340,8 @@ public void testRejoinGroup() {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
+
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index c010ba0a4ac..bcc11fd4917 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -35,8 +35,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
@@ -342,33 +340,6 @@ class AdminClient(val time: Time,
     ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
   }
 
-  def createToken(renewers: List[KafkaPrincipal], maxTimePeriodMs: Long = -1): (Errors, DelegationToken) = {
-    val responseBody = sendAnyNode(ApiKeys.CREATE_DELEGATION_TOKEN, new CreateDelegationTokenRequest.Builder(renewers.asJava, maxTimePeriodMs))
-    val response = responseBody.asInstanceOf[CreateDelegationTokenResponse]
-    val tokenInfo = new TokenInformation(response.tokenId, response.owner, renewers.asJava,
-      response.issueTimestamp, response.maxTimestamp, response.expiryTimestamp)
-    (response.error, new DelegationToken(tokenInfo, response.hmacBytes))
-  }
-
-  def renewToken(hmac: ByteBuffer, renewTimePeriod: Long = -1): (Errors, Long) = {
-    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new RenewDelegationTokenRequest.Builder(hmac, renewTimePeriod))
-    val response = responseBody.asInstanceOf[RenewDelegationTokenResponse]
-    (response.error, response.expiryTimestamp)
-  }
-
-  def expireToken(hmac: ByteBuffer, expiryTimeStamp: Long = -1): (Errors, Long) = {
-    val responseBody = sendAnyNode(ApiKeys.EXPIRE_DELEGATION_TOKEN, new ExpireDelegationTokenRequest.Builder(hmac, expiryTimeStamp))
-    val response = responseBody.asInstanceOf[ExpireDelegationTokenResponse]
-    (response.error, response.expiryTimestamp)
-  }
-
-  def describeToken(owners: List[KafkaPrincipal]): (Errors, List[DelegationToken]) = {
-    val ownersList = if (owners == null) null else owners.asJava
-    val responseBody = sendAnyNode(ApiKeys.RENEW_DELEGATION_TOKEN, new DescribeDelegationTokenRequest.Builder(ownersList))
-    val response = responseBody.asInstanceOf[DescribeDelegationTokenResponse]
-    (response.error, response.tokens().asScala.toList)
-  }
-
   def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
 
     def coordinatorLookup(group: String): Either[Node, Errors] = {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 6c5d1ce925d..0e6ea86034b 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -17,12 +17,13 @@
 
 package kafka.admin
 
-import java.nio.ByteBuffer
+import java.text.SimpleDateFormat
+import java.util
 
-import joptsimple._
+import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
@@ -71,19 +72,20 @@ object DelegationTokenCommand extends Logging {
     }
   }
 
-  def createToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
-    val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt)
+  def createToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): DelegationToken = {
+    val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]())
     val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue
 
     println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs)
-    val response = adminClient.createToken(renewerPrincipals, maxLifeTimeMs)
-    response  match {
-        case (Errors.NONE, token) => println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
-        case (e, _) =>  throw new AdminOperationException(e.message)
-    }
+    val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals)
+    val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
+    val token = createResult.delegationToken().get()
+    println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token))
+    token
   }
 
   def printToken(tokens: List[DelegationToken]): Unit = {
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
     print("\n%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
     for (token <- tokens) {
       val tokenInfo = token.tokenInfo
@@ -92,56 +94,59 @@ object DelegationTokenCommand extends Logging {
         token.hmacAsBase64String,
         tokenInfo.owner,
         tokenInfo.renewersAsString,
-        tokenInfo.issueTimestamp,
-        tokenInfo.expiryTimestamp,
-        tokenInfo.maxTimestamp))
+        dateFormat.format(tokenInfo.issueTimestamp),
+        dateFormat.format(tokenInfo.expiryTimestamp),
+        dateFormat.format(tokenInfo.maxTimestamp)))
       println()
     }
   }
 
-  private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): List[KafkaPrincipal] = {
+  private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = {
     if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList
+      Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava)
     else
-      List.empty[KafkaPrincipal]
+      None
   }
 
-  def renewToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+  def renewToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
     println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
-    val response = adminClient.renewToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), renewTimePeriodMs)
-    response match {
-      case (Errors.NONE, expiryTimeStamp) => println("Completed renew operation. New expiry timestamp : %s".format(expiryTimeStamp))
-      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
-    }
+    val renewResult = adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
+    val expiryTimeStamp = renewResult.expiryTimestamp().get()
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+    println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
+    expiryTimeStamp
   }
 
-  def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+  def expireToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): Long = {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
     println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
-    val response = adminClient.expireToken(ByteBuffer.wrap(Base64.decoder.decode(hmac)), expiryTimePeriodMs)
-    response match {
-      case (Errors.NONE, expiryTimeStamp) => println("Completed expire operation. New expiry timestamp : %s".format(expiryTimeStamp))
-      case (e, expiryTimeStamp) => throw new AdminOperationException(e.message)
-    }
+    val expireResult = adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
+    val expiryTimeStamp = expireResult.expiryTimestamp().get()
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+    println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
+    expiryTimeStamp
   }
 
-  def describeToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
+  def describeToken(adminClient: JAdminClient, opts: DelegationTokenCommandOptions): List[DelegationToken] = {
     val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt)
-    println("Calling describe token operation for owners :" + ownerPrincipals)
-    val response = adminClient.describeToken(ownerPrincipals)
-    response  match {
-      case (Errors.NONE, tokens) => println("Total Number of tokens : %s".format(tokens.size)); printToken(tokens)
-      case (e, tokens) => throw new AdminOperationException(e.message)
-    }
+    if (ownerPrincipals.isEmpty)
+      println("Calling describe token operation for current user.")
+    else
+      println("Calling describe token operation for owners :" + ownerPrincipals.get)
+
+    val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull))
+    val tokens = describeResult.delegationTokens().get().asScala.toList
+    println("Total number of tokens : %s".format(tokens.size)); printToken(tokens)
+    tokens
   }
 
-  private def createAdminClient(opts: DelegationTokenCommandOptions): AdminClient = {
+  private def createAdminClient(opts: DelegationTokenCommandOptions): JAdminClient = {
     val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
-    AdminClient.create(props)
+    JAdminClient.create(props)
   }
 
   class DelegationTokenCommandOptions(args: Array[String]) {
@@ -157,10 +162,11 @@ object DelegationTokenCommand extends Logging {
       .withRequiredArg
       .ofType(classOf[String])
 
-    val createOpt = parser.accepts("create", "Create a new delegation token.")
-    val renewOpt = parser.accepts("renew",  "Renew delegation token.")
-    val expiryOpt = parser.accepts("expire", "Expire delegation token.")
-    val describeOpt = parser.accepts("describe", "describe delegation tokens.")
+    val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.")
+    val renewOpt = parser.accepts("renew",  "Renew delegation token. Use --renew-time-period option to set renew time period.")
+    val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.")
+    val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+      " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.")
 
     val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
       .withOptionalArg()
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 74bc59faee2..6805e321393 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig,
       if (leaderIsrAndControllerEpochOpt.nonEmpty) {
         val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
         val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
+        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
@@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig,
 }
 
 object PartitionLeaderElectionAlgorithms {
-  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
+  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
     assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
       if (uncleanLeaderElectionEnabled) {
-        assignment.find(liveReplicas.contains)
+        val leaderOpt = assignment.find(liveReplicas.contains)
+        if (!leaderOpt.isEmpty)
+          controllerContext.stats.uncleanLeaderElectionRate.mark()
+        leaderOpt
       } else {
         None
       }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 225b7090761..cbbd91396b2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,7 +27,7 @@ import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
@@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback): Unit = {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
       responseCallback(joinError(memberId, error))
       return
     }
@@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback): Unit = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
       case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
         // The coordinator is loading, which means we've lost the state of the active rebalance and the
         // group will need to start over at JoinGroup. By returning rebalance in progress, the consumer
@@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit): Unit = {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
       responseCallback(error)
       return
     }
@@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int,
     var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
 
     groupIds.foreach { groupId =>
-      validateGroup(groupId) match {
+      validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
         case Some(error) =>
           groupErrors += groupId -> error
 
@@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       generationId: Int,
                       responseCallback: Errors => Unit) {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
       if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
         // the group is still loading, so respond just blindly
         responseCallback(Errors.NONE)
@@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int,
                              producerEpoch: Short,
                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
       case None =>
         val group = groupManager.getGroup(groupId).getOrElse {
@@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
       case None =>
         groupManager.getGroup(groupId) match {
@@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None):
   (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
 
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
       case Some(error) => error -> Map.empty
       case None =>
         // return offsets blindly regardless the current group state since the group may be using
@@ -543,7 +543,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match {
       case Some(error) => (error, GroupCoordinator.EmptyGroup)
       case None =>
         groupManager.getGroup(groupId) match {
@@ -563,8 +563,23 @@ class GroupCoordinator(val brokerId: Int,
     info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.")
   }
 
-  private def validateGroup(groupId: String): Option[Errors] = {
-    if (!validGroupId(groupId))
+  private def isValidGroupId(groupId: String, api: ApiKeys): Boolean = {
+    api match {
+      case ApiKeys.OFFSET_COMMIT | ApiKeys.OFFSET_FETCH | ApiKeys.DESCRIBE_GROUPS | ApiKeys.DELETE_GROUPS =>
+        // For backwards compatibility, we support the offset commit APIs for the empty groupId, and also
+        // in DescribeGroups and DeleteGroups so that users can view and delete state of all groups.
+        groupId != null
+      case _ =>
+        // The remaining APIs are groups using Kafka for group coordination and must have a non-empty groupId
+        groupId != null && !groupId.isEmpty
+    }
+  }
+
+  /**
+   * Check that the groupId is valid, assigned to this coordinator and that the group has been loaded.
+   */
+  private def validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors] = {
+    if (!isValidGroupId(groupId, api))
       Some(Errors.INVALID_GROUP_ID)
     else if (!isActive.get)
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
@@ -648,10 +663,6 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  private def validGroupId(groupId: String): Boolean = {
-    groupId != null && !groupId.isEmpty
-  }
-
   private def joinError(memberId: String, error: Errors): JoinGroupResult = {
     JoinGroupResult(
       members = Map.empty,
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5130b28b597..5f5f3736e81 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -34,14 +34,14 @@ import scala.collection.JavaConverters._
 import scala.math._
 
 /**
- * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
+ * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing
  * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
  * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
  * any previous segment.
  *
  * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
  *
- * @param log The message set containing log entries
+ * @param log The file records containing log entries
  * @param offsetIndex The offset index
  * @param timeIndex The timestamp index
  * @param baseOffset A lower bound on the offsets in this segment
diff --git a/core/src/main/scala/kafka/log/package.html b/core/src/main/scala/kafka/log/package.html
index c6ebf0c26be..ee2f72e0095 100644
--- a/core/src/main/scala/kafka/log/package.html
+++ b/core/src/main/scala/kafka/log/package.html
@@ -21,4 +21,4 @@
 The entry point for this system is LogManager. LogManager is responsible for holding all the logs, and handing them out by topic/partition. It also handles the enforcement of the
 flush policy and retention policies.
 
-The Log itself is made up of log segments. A log is a FileMessageSet that contains the data and an OffsetIndex that supports reads by offset on the log.
\ No newline at end of file
+The Log itself is made up of log segments. A log is a FileRecords that contains the data and an OffsetIndex that supports reads by offset on the log.
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 6f9c2527735..f20808791a2 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef._
 import org.apache.kafka.common.security.scram.internal.{ScramCredentialUtils, ScramMechanism}
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 
 class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {
 
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 4a947a17fcf..62a5e20322b 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -31,7 +31,8 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internal.{ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 71056888c73..a0d2c799e6a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 27a6d1127d5..56a3b8a3f39 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -18,10 +18,9 @@ package kafka.api
 
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
@@ -83,9 +82,8 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
     config.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
 
-    val adminClient = AdminClient.create(config.asScala.toMap)
-    val (error, token)  = adminClient.createToken(List())
-
+    val adminClient = AdminClient.create(config)
+    val token = adminClient.createDelegationToken().delegationToken().get()
     //wait for token to reach all the brokers
     TestUtils.waitUntilTrue(() => servers.forall(server => !server.tokenCache.tokens().isEmpty),
       "Timed out waiting for token to propagate to all servers")
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a06e9e36528..83e6863dc10 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -158,7 +158,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List(topic).asJava, listener)
 
     // poll once to get the initial assignment
-    consumer0.poll(0)
+    consumer0.poll(100)
     assertEquals(1, listener.callsToAssigned)
     assertEquals(1, listener.callsToRevoked)
 
@@ -201,11 +201,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List(topic).asJava, listener)
 
     // poll once to join the group and get the initial assignment
-    consumer0.poll(0)
+    consumer0.poll(10)
 
     // force a rebalance to trigger an invocation of the revocation callback while in the group
     consumer0.subscribe(List("otherTopic").asJava, listener)
-    consumer0.poll(0)
+    consumer0.poll(10)
 
     assertEquals(0, committedPosition)
     assertTrue(commitCompleted)
@@ -231,7 +231,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List(topic).asJava, listener)
 
     // poll once to join the group and get the initial assignment
-    consumer0.poll(0)
+    consumer0.poll(10)
 
     // we should still be in the group after this invocation
     consumer0.poll(0)
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
new file mode 100644
index 00000000000..6ae8f5e83ac
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -0,0 +1,147 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.admin
+
+import java.util
+
+import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions
+import kafka.api.{KafkaSasl, SaslSetup}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.ExecutionException
+
+class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  var adminClient: org.apache.kafka.clients.admin.AdminClient = null
+
+  override def numBrokers = 1
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+  }
+
+  override def generateConfigs = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+      enableControlledShutdown = false, enableDeleteTopic = true,
+      interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
+    props.foreach(propertyOverrides)
+    props.map(KafkaConfig.fromProps)
+  }
+
+  private def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  @Test
+  def testDelegationTokenRequests(): Unit = {
+    adminClient = org.apache.kafka.clients.admin.AdminClient.create(createAdminConfig)
+    val renewer1 = "User:renewer1"
+    val renewer2 = "User:renewer2"
+
+    // create token1 with renewer1
+    val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1)))
+
+    var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+    assertTrue(tokens.size == 1)
+    val token1 = tokens.head
+    assertEquals(token1, tokenCreated)
+
+    // create token2 with renewer2
+    val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2)))
+
+    tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+    assertTrue(tokens.size == 2)
+    assertEquals(Set(token1, token2), tokens.toSet)
+
+    //get tokens for renewer2
+    tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2)))
+    assertTrue(tokens.size == 1)
+    assertEquals(Set(token2), tokens.toSet)
+
+    //test renewing tokens
+    val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()))
+    val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head
+    assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp())
+
+    //test expire tokens
+    DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()))
+    DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()))
+
+    tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List()))
+    assertTrue(tokens.size == 0)
+
+    //create token with invalid renewer principal type
+    intercept[ExecutionException](DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3"))))
+
+    // try describing tokens for unknown owner
+    assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty)
+  }
+
+  private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = {
+    val opts = ListBuffer("--bootstrap-server", brokerList, "--max-life-time-period", "-1",
+      "--command-config", "testfile", "--create")
+    renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer))
+    new DelegationTokenCommandOptions(opts.toArray)
+  }
+
+  private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = {
+    val opts = ListBuffer("--bootstrap-server", brokerList, "--command-config", "testfile", "--describe")
+    owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner))
+    new DelegationTokenCommandOptions(opts.toArray)
+  }
+
+  private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = {
+    val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--renew",
+      "--renew-time-period", "-1",
+      "--hmac", hmac)
+    new DelegationTokenCommandOptions(opts)
+  }
+
+  private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = {
+    val opts = Array("--bootstrap-server", brokerList, "--command-config", "testfile", "--expire",
+      "--expiry-time-period", "-1",
+      "--hmac", hmac)
+    new DelegationTokenCommandOptions(opts)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      adminClient.close()
+    super.tearDown()
+    closeSasl()
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index effa55d1b2f..4cc28372449 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -59,32 +59,6 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
       result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
   }
 
-  @Test
-  def testDeleteCmdInvalidGroupId() {
-    TestUtils.createOffsetsTopic(zkClient, servers)
-    val invalidGroupId = ""
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
-    val service = getConsumerGroupService(cgcArgs)
-
-    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
-    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
-      output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}"))
-  }
-
-  @Test
-  def testDeleteInvalidGroupId() {
-    TestUtils.createOffsetsTopic(zkClient, servers)
-    val invalidGroupId = ""
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
-    val service = getConsumerGroupService(cgcArgs)
-
-    val result = service.deleteGroups()
-    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
-      result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
-  }
-
   @Test
   def testDeleteCmdNonEmptyGroup() {
     TestUtils.createOffsetsTopic(zkClient, servers)
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
index f149fc93a49..113a39d5430 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -17,10 +17,17 @@
 package kafka.controller
 
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
 
 class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")
+  }
 
   @Test
   def testOfflinePartitionLeaderElection(): Unit = {
@@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
     val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      uncleanLeaderElectionEnabled = false,
+      controllerContext)
     assertEquals(Option(4), leaderOpt)
   }
 
@@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
     val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      uncleanLeaderElectionEnabled = false,
+      controllerContext)
     assertEquals(None, leaderOpt)
+    assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
+
   @Test
   def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = {
     val assignment = Seq(2, 4)
@@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
     val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = true)
+      uncleanLeaderElectionEnabled = true,
+      controllerContext)
     assertEquals(Option(4), leaderOpt)
+    assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
 
   @Test
@@ -62,10 +75,9 @@ class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
     val reassignment = Seq(2, 4)
     val isr = Seq(2, 4)
     val liveReplicas = Set(4)
-    val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment,
+    val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
       isr,
-      liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      liveReplicas)
     assertEquals(Option(4), leaderOpt)
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 08c13eb9195..933e91bfcec 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -37,7 +37,7 @@ import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
-import scala.collection._
+import scala.collection.mutable
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
@@ -138,7 +138,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val topicPartition = new TopicPartition("foo", 0)
     var offsetCommitErrors = Map.empty[TopicPartition, Errors]
     groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
-      immutable.Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
+      Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
 
     // Heartbeat
@@ -155,7 +155,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
 
     // DeleteGroups
-    val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(immutable.Set(otherGroupId))
+    val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId))
 
     // Check that non-loading groups are still accessible
@@ -452,7 +452,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     timer.advanceClock(sessionTimeout / 2)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     timer.advanceClock(sessionTimeout / 2 + 100)
@@ -820,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
-    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
     assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
   }
 
@@ -830,7 +830,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
   }
 
@@ -856,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -870,7 +870,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -878,6 +878,44 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Some(0), partitionData.get(tp).map(_.offset))
   }
 
+  @Test
+  def testCommitAndFetchOffsetsWithEmptyGroup() {
+    // For backwards compatibility, the coordinator supports committing/fetching offsets with an empty groupId.
+    // To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups
+
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val groupId = ""
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (fetchError, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, fetchError)
+    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+
+    val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, describeError)
+    assertEquals(Empty.toString, summary.state)
+
+    val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val partition = EasyMock.niceMock(classOf[Partition])
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.replay(replicaManager, partition)
+
+    val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId))
+    assertEquals(Errors.NONE, deleteErrors(groupId))
+
+    val (err, data) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, err)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), data.get(tp).map(_.offset))
+  }
+
   @Test
   def testBasicFetchTxnOffsets() {
     val tp = new TopicPartition("topic", 0)
@@ -885,7 +923,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -912,7 +950,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -936,7 +974,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -975,16 +1013,16 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
     val errors = mutable.ArrayBuffer[Errors]()
-    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+    val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
 
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // Ensure that the two groups map to different partitions.
     assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
 
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(partitions(0) -> offsets(0))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(partitions(0) -> offsets(0))))
     assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
-    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1))))
+    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, Map(partitions(1) -> offsets(1))))
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets.
@@ -1051,16 +1089,16 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
     val errors = mutable.ArrayBuffer[Errors]()
-    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+    val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
 
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // producer0 commits the offsets for partition0
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), Map(partitions(0) -> offsets(0))))
     assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
 
     // producer1 commits the offsets for partition1
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), Map(partitions(1) -> offsets(1))))
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // producer0 commits its transaction.
@@ -1123,7 +1161,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3))
     assertEquals(Errors.NONE, commitOffsetResult(tp1))
     assertEquals(Errors.NONE, commitOffsetResult(tp2))
     assertEquals(Errors.NONE, commitOffsetResult(tp3))
@@ -1150,7 +1188,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset))
     assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
   }
 
@@ -1332,20 +1370,20 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     joinGroup(groupId, memberId, protocolType, protocols)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
   }
 
   @Test
   def testDeleteGroupWithInvalidGroupId() {
-    val invalidGroupId = ""
-    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
+    val invalidGroupId = null
+    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId))
     assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
   }
 
   @Test
   def testDeleteGroupWithWrongCoordinator() {
-    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
     assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
   }
 
@@ -1367,7 +1405,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
   }
 
@@ -1388,7 +1426,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
@@ -1408,7 +1446,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
 
     assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
@@ -1535,7 +1573,7 @@ class GroupCoordinatorTest extends JUnitSuite {
                                   assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
@@ -1616,10 +1654,10 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def commitOffsets(groupId: String,
                             consumerId: String,
                             generationId: Int,
-                            offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+                            offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
@@ -1646,10 +1684,10 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def commitTransactionalOffsets(groupId: String,
                                          producerId: Long,
                                          producerEpoch: Short,
-                                         offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+                                         offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 5269f92d358..608f3a6f561 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
+    //remove any previous unclean election metric
+    servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
     // shutdown leader and then restart follower
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
-    servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
 
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
+    assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
 
     produceMessage(servers, topic, "third")
 
@@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
+    //remove any previous unclean election metric
+    servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
     // shutdown leader and then restart follower
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
-    servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
 
     // verify that unclean election to non-ISR follower does not occur
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1))
+    assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
 
     // message production and consumption should both fail while leader is down
     try {
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 74f3ad1efea..370a1ad12b3 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -404,8 +404,11 @@ class AsyncProducerTest {
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     // don't care about config mock
-    val mockConfig = EasyMock.createNiceMock(classOf[SyncProducerConfig])
-    EasyMock.expect(mockSyncProducer.config).andReturn(mockConfig).anyTimes()
+    val myProps = new Properties()
+    myProps.put("host", "localhost")
+    myProps.put("port", "9092")
+    val myConfig = new SyncProducerConfig(myProps)
+    EasyMock.expect(mockSyncProducer.config).andReturn(myConfig).anyTimes()
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
     EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index b8388b4cb49..6093622a188 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, DelegationTokenCache, TokenInformation}
+import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
+import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 4c42dd27724..3d4be531985 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -19,14 +19,13 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert._
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
   var adminClient: AdminClient = null
@@ -49,21 +48,19 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
+    adminClient = AdminClient.create(createAdminConfig)
 
-    val createResponse = adminClient.createToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, createResponse._1)
+    val createResult = adminClient.createDelegationToken()
+    intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    val describeResponse = adminClient.describeToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, describeResponse._1)
+    val describeResult = adminClient.describeDelegationToken()
+    intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    //test renewing tokens
-    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, renewResponse._1)
+    val renewResult = adminClient.renewDelegationToken("".getBytes())
+    intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
 
-    //test expire tokens tokens
-    val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, expireResponse._1)
+    val expireResult = adminClient.expireDelegationToken("".getBytes())
+    intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[UnsupportedByAuthenticationException]
   }
 
 
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 55bf5fd022c..a00275084fb 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -18,17 +18,17 @@ package kafka.server
 
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, CreateDelegationTokenOptions, DescribeDelegationTokenOptions}
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.SecurityUtils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -46,15 +46,6 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
     super.setUp()
   }
 
-  def createAdminConfig():util.Map[String, Object] = {
-    val config = new util.HashMap[String, Object]
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
-    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
-    config
-  }
-
   override def generateConfigs = {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = true,
@@ -64,46 +55,73 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
     props.map(KafkaConfig.fromProps)
   }
 
+  private def createAdminConfig():util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
-    // test creating token
-    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
-    val tokenResult1 = adminClient.createToken(renewer1)
-    assertEquals(Errors.NONE, tokenResult1._1)
-    var token1 = adminClient.describeToken(null)._2.head
-    assertEquals(token1, tokenResult1._2)
+    adminClient = AdminClient.create(createAdminConfig)
+
+    // create token1 with renewer1
+    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:renewer1")).asJava
+    val createResult1 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer1))
+    val tokenCreated = createResult1.delegationToken().get()
+
+    //test describe token
+    var tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size() == 1)
+    var token1 = tokens.get(0)
+    assertEquals(token1, tokenCreated)
+
+    // create token2 with renewer2
+    val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:renewer2")).asJava
+    val createResult2 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer2))
+    val token2 = createResult2.delegationToken().get()
+
+    //get all tokens
+    tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size() == 2)
+    assertEquals(Set(token1, token2), tokens.asScala.toSet)
+
+    //get tokens for renewer2
+    tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(renewer2)).delegationTokens().get()
+    assertTrue(tokens.size() == 1)
+    assertEquals(Set(token2), tokens.asScala.toSet)
 
     //test renewing tokens
-    val renewResponse = adminClient.renewToken(token1.hmacBuffer())
-    assertEquals(Errors.NONE, renewResponse._1)
-
-    token1 = adminClient.describeToken(null)._2.head
-    assertEquals(renewResponse._2, token1.tokenInfo().expiryTimestamp())
+    val renewResult = adminClient.renewDelegationToken(token1.hmac())
+    var expiryTimestamp = renewResult.expiryTimestamp().get()
 
-    //test describe tokens
-    val renewer2 = List(SecurityUtils.parseKafkaPrincipal("User:Renewer1"))
-    val tokenResult2 = adminClient.createToken(renewer2)
-    assertEquals(Errors.NONE, tokenResult2._1)
-    val token2 = tokenResult2._2
+    val describeResult = adminClient.describeDelegationToken()
+    val tokenId = token1.tokenInfo().tokenId()
+    token1 = describeResult.delegationTokens().get().asScala.filter(dt => dt.tokenInfo().tokenId() == tokenId).head
+    assertEquals(expiryTimestamp, token1.tokenInfo().expiryTimestamp())
 
-    assertTrue(adminClient.describeToken(null)._2.size == 2)
+    //test expire tokens
+    val expireResult1 = adminClient.expireDelegationToken(token1.hmac())
+    expiryTimestamp = expireResult1.expiryTimestamp().get()
 
-    //test expire tokens tokens
-    val expireResponse1 = adminClient.expireToken(token1.hmacBuffer())
-    assertEquals(Errors.NONE, expireResponse1._1)
+    val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
+    expiryTimestamp = expireResult2.expiryTimestamp().get()
 
-    val expireResponse2 = adminClient.expireToken(token2.hmacBuffer())
-    assertEquals(Errors.NONE, expireResponse2._1)
-
-    assertTrue(adminClient.describeToken(null)._2.size == 0)
+    tokens = adminClient.describeDelegationToken().delegationTokens().get()
+    assertTrue(tokens.size == 0)
 
     //create token with invalid principal type
-    val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer1"))
-    val tokenResult3 = adminClient.createToken(renewer3)
-    assertEquals(Errors.INVALID_PRINCIPAL_TYPE, tokenResult3._1)
-
+    val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
+    val createResult3 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer3))
+    intercept[ExecutionException](createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+
+    // try describing tokens for unknown owner
+    val unknownOwner = List(SecurityUtils.parseKafkaPrincipal("User:Unknown")).asJava
+    tokens = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(unknownOwner)).delegationTokens().get()
+    assertTrue(tokens.isEmpty)
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 0561cacb8f8..8f8842bbe97 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -19,17 +19,15 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util
 
-import kafka.admin.AdminClient
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.SecurityUtils
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.errors.DelegationTokenDisabledException
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -58,23 +56,19 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
 
   @Test
   def testDelegationTokenRequests(): Unit = {
-    adminClient = AdminClient.create(createAdminConfig.asScala.toMap)
-
-    val renewer1 = List(SecurityUtils.parseKafkaPrincipal("User:" + JaasTestUtils.KafkaPlainUser))
-    val createResponse = adminClient.createToken(renewer1)
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createResponse._1)
+    adminClient = AdminClient.create(createAdminConfig)
 
-    val describeResponse = adminClient.describeToken(List())
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, describeResponse._1)
+    val createResult = adminClient.createDelegationToken()
+    intercept[ExecutionException](createResult.delegationToken().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
-    //test renewing tokens
-    val renewResponse = adminClient.renewToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, renewResponse._1)
+    val describeResult = adminClient.describeDelegationToken()
+    intercept[ExecutionException](describeResult.delegationTokens().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
-    //test expire tokens tokens
-    val expireResponse = adminClient.expireToken(ByteBuffer.wrap("".getBytes()))
-    assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, expireResponse._1)
+    val renewResult = adminClient.renewDelegationToken("".getBytes())
+    intercept[ExecutionException](renewResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
 
+    val expireResult = adminClient.expireDelegationToken("".getBytes())
+    intercept[ExecutionException](expireResult.expiryTimestamp().get()).getCause.isInstanceOf[DelegationTokenDisabledException]
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 2a7d6d400d5..ed85415eacc 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -315,13 +315,13 @@ class RequestQuotaTest extends BaseRequestTest {
           new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
 
         case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
-          new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+          new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
           new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
 
         case ApiKeys.RENEW_DELEGATION_TOKEN =>
-          new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
+          new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DELETE_GROUPS =>
           new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> consumer poll(timeout) blocked infinitely when no available bootstrap server
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6783
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6783
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Qiang Zhao
>            Priority: Major
>              Labels: features
>             Fix For: 1.2.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> {code:java}
>     @Test
>     public void testPollWithAllBootstrapServersDown() throws Exception {
>         ExecutorService executor = Executors.newSingleThreadExecutor();
>         try {
>             final long pollTimeout = 1000;
>             final AtomicBoolean pollComplete = new AtomicBoolean();
>             executor.submit(new Runnable() {
>                 @Override
>                 public void run() {
>                     Properties props = new Properties();
>                     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
>                     try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
>                         consumer.subscribe(Arrays.asList(topic));
>                         try {
>                             consumer.poll(pollTimeout);
>                         } catch (Exception ex) {
>                             ex.printStackTrace();
>                         } finally {
>                             pollComplete.set(true);
>                         }
>                     }
>                 }
>             });
>             Thread.sleep(pollTimeout * 2);
>             Assert.assertTrue("poll timeout not work when all servers down", pollComplete.get());
>         } finally {
>             executor.shutdown();
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message